# API Reference¶

All functions and classes from aiochan.channel can also be imported from the top-level aiochan module.

## Channel¶

class aiochan.channel.Chan(buffer=None, buffer_size=None, *, loop=None, name=None)

A channel, the basic construct in CSP-style concurrency.

Channels can be used as async generators using the async for construct for async iteration of the values.

Parameters: buffer – if a aiochan.buffers.AbstractBuffer() is given, then it will be used as the buffer. In this case buffer_size has no effect. If an integer is given, then a aiochan.buffers.FixedLengthBuffer() will be created with the integer value as the buffer size and used. If the a string value of f, d, s or p is given, a aiochan.buffers.FixedLengthBuffer(), aiochan.buffers.DroppingBuffer(), aiochan.buffers.SlidingBuffer() or aiochan.buffers.PromiseBuffer() will be created and used, with size given by the parameter buffer_size. buffer_size – see the doc for buffer. loop – the asyncio loop that should be used when scheduling and creating futures. If None, will use the current loop. If the special string value “no_loop” is given, then will not use a loop at all. Even in this case the channel can operate if you use only aiochan.channel.Chan.get_nowait() and aiochan.channel.Chan.put_nowait(). name – used to provide more friendly debugging outputs.
put(val)

Coroutine. Put a value into the channel.

Parameters: val – value to put into the channel. Cannot be None. Awaitable of True if the op succeeds before the channel is closed, False if the op is applied to a then-closed channel.
put_nowait(val, cb=None, *, immediate_only=True)

Put val into the channel synchronously.

If immediate_only is True, the operation will not be queued if it cannot complete immediately.

When immediate_only is False, cb can be optionally provided, which will be called when the put op eventually completes, with a single argumentTrue or False depending on whether the channel is closed at the time of completion of the put op. cb cannot be supplied when immediate_only is True.

Returns True if the put succeeds immediately, False if the channel is already closed, None if the operation is queued.

add(*vals)

Convenient method for putting many elements to the channel. The put semantics is the same as aiochan.channel.Chan.put_nowait() with immediate_only=False.

Note that this method can potentially overflow the channel’s put queue, so it is only suitable for adding small number of elements.

Parameters: vals – values to add, none of which can be None. self
get()

Coroutine. Get a value of of the channel.

Returns: An awaitable holding the obtained value, or of None if the channel is closed before succeeding.
get_nowait(cb=None, *, immediate_only=True)

try to get a value from the channel but do not wait. :type self: Chan :param self: :param cb: a callback to execute, passing in the eventual value of the get operation, which is None if the channel becomes closed before a value is available. Cannot be supplied when immediate_only is True. Note that if cb is supplied, it will be executed even when the value IS immediately available and returned by the function. :param immediate_only: do not queue the get operation if it cannot be completed immediately. :return: the value if available immediately, None otherwise

close()

Close the channel.

After this method is called, further puts to this channel will complete immediately without doing anything. Further gets will yield values in pending puts or buffer. After pending puts and buffer are both drained, gets will complete immediately with None as the result.

Closing an already closed channel is an no-op.

Returns: self
closed
Returns: whether this channel is already closed.
join()

Coroutine. Wait for the channel to be closed and completed exhausted.

Returns: An awaitable that will yield when the channel becomes both closed and exhausted (i.e., no buffer,

no pending puts)

stats()

Getting the current stats of the channel, useful for determining bottlenecks and debugging back pressure in a processing pipeline.

Returns: a ChanStat object cs, where cs.state is ‘PENDING_PUTS’, ‘PENDING_GETS’ or ‘FLUENT’ according to whether the channel is currently blocked on puts, blocked on gets, or not blocked (either because there is no operation going on or there is buffer available), cs.buffered, cs.queued, cs.immediate count how many values have been delivered according to whether the getter was given a buffered value, the getter was queued, or the getter obtained value immediately from a pending putter.
async_apply(f=<function Chan._pipe_worker>, out=None, buffer=None, buffer_size=None)

Apply a coroutine function to values in the channel, giving out an arbitrary number of results into the output channel and return the output value.

Parameters: f – a coroutine function taking two channels, inp and out. inp is the current channel and out is the given or newly created out channel. The coroutine function should take elements from inp, do its processing, and put the processed values into out. When, how often and whether values are put into out, and when or whether out is ever closed, is up to the coroutine. If f is not given, an identity coroutine function which will just pass the values along and close out when inp is closed is used. out – the out channel giving to the coroutine function f. If None, a new channel with no buffer will be created. buffer – buffer of the internal channel, only applies if out is None buffer_size – buffer_size of the internal channel, only applies if out is None the out channel.
async_pipe(n, f, out=None, buffer=None, buffer_size=None, *, close=True)

Asynchronously apply the coroutine function f to each value in the channel, and pipe the results to out. The results will be processed in unspecified order but will be piped into out in the order of their inputs.

If f involves slow or blocking operation, consider using parallel_pipe.

If ordering is not important, consider using async_pipe_unordered.

Parameters: n – how many coroutines to spawn for processing. f – a coroutine function accepting one input value and returning one output value. S hould never return None. out – the output channel. if None, one without buffer will be created and used. buffer – buffer of the internal channel, only applies if out is None buffer_size – buffer_size of the internal channel, only applies if out is None close – whether to close the output channel when the input channel is closed. the output channel.
async_pipe_unordered(n, f, out=None, buffer=None, buffer_size=None, *, close=True)

Asynchronously apply the coroutine function f to each value in the channel, and pipe the results to out. The results will be put into out in an unspecified order: whichever result completes first will be given first.

If f involves slow or blocking operation, consider using parallel_pipe_unordered.

If ordering is not important, consider using async_pipe.

Parameters: n – how many coroutines to spawn for processing. f – a coroutine function accepting one input value and returning one output value. Should never return None. out – the output channel. if None, one without buffer will be created and used. buffer – buffer of the internal channel, only applies if out is None buffer_size – buffer_size of the internal channel, only applies if out is None close – whether to close the output channel when the input channel is closed. the output channel.
parallel_pipe(n, f, out=None, buffer=None, buffer_size=None, close=True, flatten=False, mode='process', mp_module=<module 'multiprocessing' from '/usr/lib/python3.5/multiprocessing/__init__.py'>, pool_args=None, pool_kwargs=None, error_cb=None, pool_buffer=None)

Apply the plain function f to each value in the channel, and pipe the results to out. The function f will be run in a pool executor with parallelism n. The results will be put into out in in the order that their arguments arrive.

Note that even in the presence of GIL, thread mode is usually sufficient for achieving the greatest parallelism: the overhead is much lower than process mode, and many blocking or slow operations (e.g. file operations, network operations, numpy computations) actually release the GIL.

If f involves no blocking or slow operation, consider using async_pipe_unordered.

If ordering is important, consider using parallel_pipe.

Parameters: n – the parallelism of the pool executor (number of threads or number of processes). f – a plain function accepting one input value and returning one output value. Should never return None. out – the output channel. if None, one without buffer will be created and used. buffer – buffer of the internal channel, only applies if out is None buffer_size – buffer_size of the internal channel, only applies if out is None mode – if thread, a ThreadPoolExecutor will be used; if process, a Pool will be used. close – whether to close the output channel when the input channel is closed. flatten – if True, assume f returns sequence and puts individual elements of the sequence onto the output channel instead mp_module – when mode=’process’, you can optionally pass in a compatible multiprocessing module (for example, torch.multiprocessing from pytorch). pool_args – additional arguments when creating pool pool_kwargs – additional keyword arguments when creating pool error_cb – callback in case there is an error pool_buffer – the number of jobs that can be over-committed to the pool the output channel.
parallel_pipe_unordered(n, f, out=None, buffer=None, buffer_size=None, close=True, flatten=False, mode='process', mp_module=<module 'multiprocessing' from '/usr/lib/python3.5/multiprocessing/__init__.py'>, pool_args=None, pool_kwargs=None, error_cb=None, pool_buffer=None)

Apply the plain function f to each value in the channel, and pipe the results to out. The function f will be run in a pool with parallelism n. The results will be processed in unspecified order but will be piped into out in the order of their inputs.

Note that even in the presence of GIL, thread mode is usually sufficient for achieving the greatest parallelism: the overhead is much lower than process mode, and many blocking or slow operations (e.g. file operations, network operations, numpy computations) actually release the GIL.

If f involves no blocking or slow operation, consider using async_pipe.

Parameters: n – the parallelism of the pool executor (number of threads or number of processes). f – a plain function accepting one input value and returning one output value. Should never return None. out – the output channel. if None, one without buffer will be created and used. buffer – buffer of the internal channel, only applies if out is None buffer_size – buffer_size of the internal channel, only applies if out is None mode – if thread, a ThreadPoolExecutor will be used; if process, a Pool will be used. close – whether to close the output channel when the input channel is closed. flatten – if True, assume f returns sequence and puts individual elements of the sequence onto the output channel instead mp_module – when mode=’process’, you can optionally pass in a compatible multiprocessing module (for example, torch.multiprocessing from pytorch). pool_args – additional arguments when creating pool pool_kwargs – additional keyword arguments when creating pool error_cb – callback in case there is an error pool_buffer – the number of jobs that can be over-committed to the pool the output channel.
to_queue(q)

Put elements from the channel onto the given queue. Useful for inter-thread communication.

To be useful at all, this method should be called before running the asyncio loop:

loop = asyncio.create_new_loop()
chan = ac.Chan(loop=loop)
q = chan.to_queue()

# do something with the queue

Parameters: q – the queue. the queue q.
to_iterable(buffer_size=1)

Return an iterable containing the values in the channel.

This method is a convenience provided expressly for inter-thread usage. Typically, we will have an asyncio loop on a background thread producing values, and this method can be used as an escape hatch to transport the produced values back to the main thread.

If your workflow consists entirely of operations within the asyncio loop, you should use the channel as an async generator directly: async for val in ch: ....

To be useful at all, this method should be called before running the asyncio loop:

loop = asyncio.create_new_loop()
chan = ac.Chan(loop=loop)
it = chan.to_iterable()

for item in it:
# do something with the item

Parameters: buffer_size – buffering between the iterable and the channel. the iterable.
map(f, *, out=None, buffer=None, buffer_size=None, close=True, flatten=False)

Returns a channel containing f(v) for values v from the channel.

Parameters: close – whether out should be closed when there are no more values to be produced. out – the output channel. If None, one with no buffering will be created. buffer – buffer of the internal channel, only applies if out is None buffer_size – buffer_size of the internal channel, only applies if out is None f – a function receiving one element and returning one element. Cannot return None. flatten – if True, assume f returns sequence and puts individual elements of the sequence onto the output channel instead the output channel.
filter(p, *, out=None, buffer=None, buffer_size=None, close=True)

Returns a channel containing values v from the channel for which p(v) is true.

Parameters: close – whether out should be closed when there are no more values to be produced. out – the output channel. If None, one with no buffering will be created. buffer – buffer of the internal channel, only applies if out is None buffer_size – buffer_size of the internal channel, only applies if out is None p – a function receiving one element and returning whether this value should be kept. the output channel.
take(n, *, out=None, buffer=None, buffer_size=None, close=True)

Returns a channel containing at most n values from the channel.

Parameters: n – how many values to take. out – the output channel. If None, one with no buffering will be created. buffer – buffer of the internal channel, only applies if out is None buffer_size – buffer_size of the internal channel, only applies if out is None close – whether out should be closed when there are no more values to be produced. the output channel.
drop(n, *, out=None, buffer=None, buffer_size=None, close=True)

Returns a channel containing values from the channel except the first n values.

Parameters: n – how many values to take. out – the output channel. If None, one with no buffering will be created. buffer – buffer of the internal channel, only applies if out is None buffer_size – buffer_size of the internal channel, only applies if out is None close – whether out should be closed when there are no more values to be produced. the output channel.
take_while(p, *, out=None, buffer=None, buffer_size=None, close=True)

Returns a channel containing values v from the channel until p(v) becomes false.

Parameters: p – a function receiving one element and returning whether this value should be kept. out – the output channel. If None, one with no buffering will be created. buffer – buffer of the internal channel, only applies if out is None buffer_size – buffer_size of the internal channel, only applies if out is None close – whether out should be closed when there are no more values to be produced. the output channel.
drop_while(p, *, out=None, buffer=None, buffer_size=None, close=True)

Returns a channel containing values v from the channel after p(v) becomes false for the first time.

Parameters: p – a function receiving one element and returning whether this value should be dropped. out – the output channel. If None, one with no buffering will be created. buffer – buffer of the internal channel, only applies if out is None buffer_size – buffer_size of the internal channel, only applies if out is None close – whether out should be closed when there are no more values to be produced. the output channel.
group(n, out=None, buffer=None, buffer_size=None, close=True)

Returns a channel containing the elements of the source channel grouped into batches of size n (the last batch may be less than n).

Parameters: n – the size of the batch out – the output channel. If None, one with no buffering will be created. buffer – buffer of the internal channel, only applies if out is None buffer_size – buffer_size of the internal channel, only applies if out is None close – whether out should be closed when there are no more values to be produced. the output channel.
group_by(f, out=None, buffer=None, buffer_size=None, close=True)

Returns a channel containing (group_key, [elements…]) where group_key is the result of f applied to elements of the source channel and elements … are consecutive elements with the same group_key.

Parameters: f – the key function out – the output channel. If None, one with no buffering will be created. buffer – buffer of the internal channel, only applies if out is None buffer_size – buffer_size of the internal channel, only applies if out is None close – whether out should be closed when there are no more values to be produced. the output channel.
distinct(*, out=None, buffer=None, buffer_size=None, close=True)

Returns a channel containing distinct values from the channel (consecutive duplicates are dropped).

Parameters: out – the output channel. If None, one with no buffering will be created. buffer – buffer of the internal channel, only applies if out is None buffer_size – buffer_size of the internal channel, only applies if out is None close – whether out should be closed when there are no more values to be produced. the output channel.
reduce(f, init=None, *, out=None, buffer=None, buffer_size=None, close=True)

Returns a channel containing the single value that is the reduce (i.e. left-fold) of the values in the channel.

Parameters: f – a function taking two arguments accumulator and next_value and returning new_accumulator. init – if given, will be used as the initial accumulator. If not given, the first element in the channel will be used instead. out – the output channel. If None, one with no buffering will be created. close – whether out should be closed when there are no more values to be produced. the output channel.
scan(f, init=None, *, out=None, buffer=None, buffer_size=None, close=True)

Similar to reduce, but all intermediate accumulators are put onto the out channel in order as well.

Parameters: f – a function taking two arguments accumulator and next_value and returning new_accumulator. init – if given, will be used as the initial accumulator. If not given, the first element in the channel will be used instead. out – the output channel. If None, one with no buffering will be created. buffer – buffer of the internal channel, only applies if out is None buffer_size – buffer_size of the internal channel, only applies if out is None close – whether out should be closed when there are no more values to be produced. the output channel.
dup()

Create a aiochan.channel.Dup() from the channel

Returns: the duplicator
pub(topic_fn=operator.itemgetter(0), buffer=None, buffer_size=None)

Create a aiochan.channel.Pub() from the channel

Returns: the publisher
distribute(*outs, close=True)

Distribute the items in this channel to the output channels. Values will not be “lost” due to being put to closed channels.

Parameters: outs – the output channels close – whether to close the output channels when the input closes self
collect(n=None)

Coroutine. Collect the elements in the channel into a list and return the list.

Parameters: n – if given, will take at most n elements from the channel, otherwise take until channel is closed. an awaitable containing the collected values.
aiochan.channel.tick_tock(seconds, start_at=None, loop=None)

Returns a channel that gives out values every seconds.

The channel contains tuples, in which the first elements are numbers from 1, counting how many ticks have been passed, and the second elements are the times at which the elements are generated.

Parameters: start_at – if None, the first tick occurs seconds later. If given, the first tick occurs at the given time (in float). seconds – time interval of the ticks loop – you can optionally specify the loop on which the returned channel is intended to be used. the tick channel
aiochan.channel.timeout(seconds, loop=None)

Returns a channel that closes itself after seconds.

Parameters: seconds – time before the channel is closed loop – you can optionally specify the loop on which the returned channel is intended to be used. the timeout channel
aiochan.channel.from_iter(it, *, loop=None)

Convert an iterable into a channel.

The channel will be closed on creation, but gets will succeed until the iterable is exhausted.

It is ok for the iterable to be unbounded.

Parameters: it – the iterable to convert. loop – you can optionally specify the loop on which the returned channel is intended to be used. the converted channel.
aiochan.channel.from_range(start=None, end=None, step=None, *, loop=None)

returns a channel that gives out consecutive numerical values.

If start is None, then the count goes from 0 to the maximum number that python can count.

If start and step are given, then the values are produced as if by itertools.count.

Otherwise the values are produced as if by range.

Parameters: loop – you can optionally specify the loop on which the returned channel is intended to be used. the range channel
aiochan.channel.select(*chan_ops, priority=False, default=None, cb=None, loop=None)

Asynchronously completes at most one operation in chan_ops

Parameters: chan_ops – operations, each is either a channel in which a get operation is attempted, or a tuple (chan, val) in which a put operation is attempted. priority – if True, the operations will be tried serially, else the order is random default – if not None, do not queue the operations if they cannot be completed immediately, instead return a future containing SelectResult(val=default, chan=None). cb – loop – asyncio loop to run on a function containing SelectResult(val=result, chan=succeeded_chan)
aiochan.channel.merge(*inputs, out=None, buffer=None, buffer_size=None, close=True)

Merge the elements of the input channels into a single channel containing the individual values from the inputs.

Parameters: inputs – the input channels out – the output chan. If None, a new unbuffered channel will be used. buffer – buffer of the internal channel, only applies if out is None buffer_size – buffer_size of the internal channel, only applies if out is None close – whether to close out when all inputs are closed. the ouput channel
aiochan.channel.zip_chans(*inputs, out=None, buffer=None, buffer_size=None, close=True)

Merge the elements of the input channels into a single channel containing lists of individual values from the inputs. The input values are consumed in lockstep.

Parameters: inputs – the input channels out – the output chan. If None, a new unbuffered channel will be used. buffer – buffer of the internal channel, only applies if out is None buffer_size – buffer_size of the internal channel, only applies if out is None close – whether to close out when all inputs are closed. the ouput channel
aiochan.channel.combine_latest(*inputs, out=None, buffer=None, buffer_size=None, close=True)

Merge the elements of the input channels into a single channel containing lists of individual values from the inputs. The input values are consumed individually and each time a new value is consumed from any inputs, a list containing the latest values from all channels will be returned. In the list, channels that has not yet returned any values will have their corresponding values set to None.

Parameters: inputs – the input channels out – the output chan. If None, a new unbuffered channel will be used. buffer – buffer of the internal channel, only applies if out is None buffer_size – buffer_size of the internal channel, only applies if out is None close – whether to close out when all inputs are closed. the ouput channel
class aiochan.channel.Dup(inp)

A duplicator: takes values from the input, and gives out the same value to all outputs.

Note that duplication is performed in lockstep: if any of the outputs blocks on put, the whole operation will block. Thus the outputs should use some buffering as appropriate for the situation.

When there are no output channels, values from the input channels are dropped.

Parameters: inp – the input channel
inp
Returns: the input channel
tap(out=None, buffer=None, buffer_size=None, close=True)

add channels to the duplicator to receive duplicated values from the input.

Parameters: out – the channel to add. If None, an unbuffered channel will be created. buffer – buffer of the internal channel, only applies if out is None buffer_size – buffer_size of the internal channel, only applies if out is None close – whether to close the added channels when the input is closed the output channel
untap(out)

remove output channels from the duplicator so that they will no longer receive values from the input.

Parameters: out – the channel to remove the removed channel
untap_all()

remove all output channels from the duplicator.

Returns: self
close()

Close the duplicator.

Returns: self
class aiochan.channel.Pub(inp, *, topic_fn=operator.itemgetter(0), buffer=None, buffer_size=None)

A publisher: similar to a duplicator but allowing for topic-based duplication.

As in the case of duplicators, the duplication process for any particular topic is processed in lockstep: i.e. if any particular subscriber blocks on put, the whole operation is blocked. Hence buffers should be used in appropriate situations, either globally by setting the buffer and buffer_size parameters, or individually for each subscription channel.

Parameters: inp – the channel to be used as the source of the publication. topic_fn – a function accepting one argument and returning one result. This will be applied to each value as they come in from inp, and the results will be used as topics for subscription. None topic is not allowed. If topic_fn is None, will assume the values from inp are tuples and the first element in each tuple is the topic. buffer – together with buffer_size, will be used to determine the buffering of each topic. The acceptable values are the same as for the constructor of aiochan.channel.Chan(). buffer_size – see above
sub(topic, out=None, buffer=None, buffer_size=None, close=True)

Subscribe outs to topic.

Parameters: topic – the topic to subscribe out – the subscribing channel. If None, an unbuffered channel will be used. buffer – buffer of the internal channel, only applies if out is None buffer_size – buffer_size of the internal channel, only applies if out is None close – whether to close these channels when the input is closed the subscribing channel
unsub(topic, out)

Stop the subscription of outs to topic.

Parameters: topic – the topic to unsubscribe from out – the channel to unsubscribe the unsubscribing channel
unsub_all(topic)

Stop all subscriptions under a topic

Parameters: topic – the topic to stop. If None, all subscriptions are stopped. self
close()

close the subscription

Returns: self
aiochan.channel.go(coro, loop=None)

Spawn a coroutine in the specified loop. The loop will stop when the coroutine exits.

Parameters: coro – the coroutine to spawn. loop – the event loop to run the coroutine, or the current loop if None. An awaitable containing the result of the coroutine.
aiochan.channel.nop()

Useful for yielding control to the scheduler. :return:

aiochan.channel.run_in_thread(coro, loop=None)

Spawn a coroutine in the specified loop on a background thread. The loop will stop when the coroutine exits, and then the background thread will complete.

Parameters: coro – the coroutine to spawn. loop – the event loop to run the coroutine, or a newly created loop if None. (loop, thread), where loop is the loop on which the coroutine is run, thread is the thread on which the loop is run.
aiochan.channel.run(coro, loop=None)

Run coroutine in loop on the current thread. Will block until the coroutine is complete.

Parameters: coro – the coroutine to run loop – the event loop to run the coroutine, or a newly created loop if None. None.
aiochan.channel.MAX_OP_QUEUE_SIZE = 1024

The maximum pending puts or pending takes for a channel.

Usually you should leave this option as it is. If you find yourself receiving exceptions due to put/get queue size exceeding limits, you should consider using appropriate aiochan.buffers when creating the channels.

aiochan.channel.MAX_DIRTY_SIZE = 256

The size of cancelled operations in put/get queues before a cleanup is triggered (an operation can only become cancelled due to the aiochan.channel.select() or operations using it, or in other words, there is no direct user control of cancellation).

## Buffer¶

class aiochan.buffers.AbstractBuffer

Abstract buffer class intended for subclassing, to be used by channels.

add(el)

Add an element to the buffer.

Will only be called after can_add returns True.

Parameters: el – the element to add None
take()

Take an element from the buffer.

Will only be called after can_take returns True. :return: an element from the buffer

can_add

Will be called each time before calling add.

Returns: bool, whether an element can be added.
can_take

Will be called each time before calling take.

Returns: bool, whether an element can be taken.
class aiochan.buffers.FixedLengthBuffer(maxsize)

A fixed length buffer that will block on get when empty and block on put when full.

Parameters: maxsize – size of the buffer
class aiochan.buffers.DroppingBuffer(maxsize)

A dropping buffer that will block on get when empty and never blocks on put.

When the buffer is full, puts will succeed but the new values are dropped.

Parameters: maxsize – size of the buffer
class aiochan.buffers.SlidingBuffer(maxsize)

A sliding buffer that will block on get when empty and never blocks on put.

When the buffer is full, puts will succeed and the oldest values are dropped.

Parameters: maxsize – size of the buffer
class aiochan.buffers.PromiseBuffer(_=None)

A promise buffer that blocks on get when empty and never blocks on put.

After a single value is put into the buffer, all subsequent gets will succeed with this value, and all subsequent puts will succeed but new values are ignored.

class aiochan.buffers.IterBuffer(it)

A buffer that is constructed from a iterable (unbounded iterable is ok).

The buffer never accepts new inputs and will give out items from the iterable one by one, and when the iterable is exhausted will block on further gets.

Parameters: it – the iterable to construct the buffer from.