upsies.utils.signal

Managing callbacks

Classes

class upsies.utils.signal.Signal(id=None, *, signals)[source]

Bases: object

Simple callback registry

class Stopped(value)[source]

Bases: Flag

Three-way boolean enum: false, true and immediately

property id

Instance ID used for debugging

add(signal, *, record=False)[source]

Create new signal

Parameters:
  • signal (hashable) – Any hashable object

  • record (bool) – Whether emissions of this signal are recorded in emissions_recorded

Raises:
property signals

Mutable dictionary of signals mapped to lists of callbacks

record(signal)[source]

Record emissions of signal in emissions

property recording

class:list of signals that are recorded

register(signal, callback)[source]

Call callback when signal is emited

Parameters:
  • signal (hashable) – Previously added signal

  • callback (callable) – Any callback. The signature depends on the caller of emit().

Raises:
emit(signal, *args, **kwargs)[source]

Call callbacks that are registered to signal and make any ongoing receive_all() calls iterate and any ongoing receive_one() calls return

Parameters:

signal (hashable) – Previously added signal

Any other arguments are passed on to the callbacks.

async receive_all(signal, *, only_posargs=False)[source]

Iterate over (args, kwargs) from emit() calls for signal

This will always produce all emitted signals, both from the past and the future, until stop() is called. The only exception is when stop() is called with immediately=True, in which case iteration stops immediately.

Parameters:
  • signal (hashable) – Previously added signal

  • only_posargs (bool) – Whether to ignore any keyword arguments from the emit() call and only iterate over positional arguments

async receive_one(signal, *, only_posargs=False)[source]

Same as receive_all(), but return just one emission

async wait_for(signal)[source]

Wait for emission of signal and return None

stop(*, immediately=False)[source]

Do not emit() any more signals

Calling this method disallows further calls to emit(), iterating over any ongoing receive_all() calls stops, and any ongoing receive_one() calls return None

Parameters:

immediately (bool) –

If truthy receive_all() calls stop iterating NOW without producing any more emissions and receive_one() calls return None

Otherwise, any emissions made before :meth`stop` is called are processed normally before iteration stops.

property is_stopped

Whether stop() was called (see Stopped)

This means emit() can no longer be called, any ongoing receive_all() calls will stop iterating, and any ongoing receive_one() calls return None.

property emissions

Sequence of emit() calls made so far

Each call is stored as a tuple like this:

(<signal>, {"args": <positional arguments>,
            "kwargs": <keyword arguments>})
property emissions_recorded

Sequence of only recorded emit() calls made so far

Each call is stored as a tuple like this:

(<signal>, {"args": <positional arguments>,
            "kwargs": <keyword arguments>})
replay(emissions)[source]

emit() previously recorded emissions

suspend(*signals)[source]

Context manager that blocks certain signals in its body

Parameters:

signals – Which signals to block

Raises:

ValueError – if any signal in signals is not registered