Skip to content

aea.multiplexer

Module for the multiplexer class and related classes.

MultiplexerStatus Objects

class MultiplexerStatus(AsyncState)

The connection status class.

__init__

 | __init__() -> None

Initialize the connection status.

is_connected

 | @property
 | is_connected() -> bool

Return is connected.

is_connecting

 | @property
 | is_connecting() -> bool

Return is connecting.

is_disconnected

 | @property
 | is_disconnected() -> bool

Return is disconnected.

is_disconnecting

 | @property
 | is_disconnecting() -> bool

Return is disconnected.

AsyncMultiplexer Objects

class AsyncMultiplexer(Runnable,  WithLogger)

This class can handle multiple connections at once.

__init__

 | __init__(connections: Optional[Sequence[Connection]] = None, default_connection_index: int = 0, loop: Optional[AbstractEventLoop] = None, exception_policy: ExceptionPolicyEnum = ExceptionPolicyEnum.propagate, threaded: bool = False, agent_name: str = "standalone", default_routing: Optional[Dict[PublicId, PublicId]] = None, default_connection: Optional[PublicId] = None, protocols: Optional[List[Union[Protocol, Message]]] = None) -> None

Initialize the connection multiplexer.

Arguments:

  • connections: a sequence of connections.
  • default_connection_index: the index of the connection to use as default. This information is used for envelopes which don't specify any routing context. If connections is None, this parameter is ignored.
  • loop: the event loop to run the multiplexer. If None, a new event loop is created.
  • exception_policy: the exception policy used for connections.
  • threaded: if True, run in threaded mode, else async
  • agent_name: the name of the agent that owns the multiplexer, for logging purposes.
  • default_routing: default routing map
  • default_connection: default connection
  • protocols: protocols used

default_connection

 | @property
 | default_connection() -> Optional[Connection]

Get the default connection.

in_queue

 | @property
 | in_queue() -> AsyncFriendlyQueue

Get the in queue.

out_queue

 | @property
 | out_queue() -> asyncio.Queue

Get the out queue.

connections

 | @property
 | connections() -> Tuple[Connection, ...]

Get the connections.

is_connected

 | @property
 | is_connected() -> bool

Check whether the multiplexer is processing envelopes.

default_routing

 | @property
 | default_routing() -> Dict[PublicId, PublicId]

Get the default routing.

default_routing

 | @default_routing.setter
 | default_routing(default_routing: Dict[PublicId, PublicId]) -> None

Set the default routing.

connection_status

 | @property
 | connection_status() -> MultiplexerStatus

Get the connection status.

run

 | async run() -> None

Run multiplexer connect and receive/send tasks.

set_loop

 | set_loop(loop: AbstractEventLoop) -> None

Set event loop and all event loop related objects.

Arguments:

  • loop: asyncio event loop.

add_connection

 | add_connection(connection: Connection, is_default: bool = False) -> None

Add a connection to the multiplexer.

Arguments:

  • connection: the connection to add.
  • is_default: whether the connection added should be the default one.

connect

 | async connect() -> None

Connect the multiplexer.

disconnect

 | async disconnect() -> None

Disconnect the multiplexer.

get

 | get(block: bool = False, timeout: Optional[float] = None) -> Optional[Envelope]

Get an envelope within a timeout.

Arguments:

  • block: make the call blocking (ignore the timeout).
  • timeout: the timeout to wait until an envelope is received.

Returns:

the envelope, or None if no envelope is available within a timeout.

async_get

 | async async_get() -> Envelope

Get an envelope async way.

Returns:

the envelope

async_wait

 | async async_wait() -> None

Get an envelope async way.

Returns:

the envelope

put

 | put(envelope: Envelope) -> None

Schedule an envelope for sending it.

Notice that the output queue is an asyncio.Queue which uses an event loop running on a different thread than the one used in this function.

Arguments:

  • envelope: the envelope to be sent.

Multiplexer Objects

class Multiplexer(AsyncMultiplexer)

Transit sync multiplexer for compatibility.

__init__

 | __init__(*args: Any, **kwargs: Any) -> None

Initialize the connection multiplexer.

Arguments:

  • args: arguments
  • kwargs: keyword arguments

set_loop

 | set_loop(loop: AbstractEventLoop) -> None

Set event loop and all event loop related objects.

Arguments:

  • loop: asyncio event loop.

connect

 | connect() -> None

Connect the multiplexer.

Synchronously in thread spawned if new loop created.

disconnect

 | disconnect() -> None

Disconnect the multiplexer.

Also stops a dedicated thread for event loop if spawned on connect.

put

 | put(envelope: Envelope) -> None

Schedule an envelope for sending it.

Notice that the output queue is an asyncio.Queue which uses an event loop running on a different thread than the one used in this function.

Arguments:

  • envelope: the envelope to be sent.

InBox Objects

class InBox()

A queue from where you can only consume envelopes.

__init__

 | __init__(multiplexer: AsyncMultiplexer) -> None

Initialize the inbox.

Arguments:

  • multiplexer: the multiplexer

empty

 | empty() -> bool

Check for a envelope on the in queue.

Returns:

boolean indicating whether there is an envelope or not

get

 | get(block: bool = False, timeout: Optional[float] = None) -> Envelope

Check for a envelope on the in queue.

Arguments:

  • block: make the call blocking (ignore the timeout).
  • timeout: times out the block after timeout seconds.

Returns:

the envelope object.

Raises:

  • Empty: if the attempt to get an envelope fails.

get_nowait

 | get_nowait() -> Optional[Envelope]

Check for a envelope on the in queue and wait for no time.

Returns:

the envelope object

async_get

 | async async_get() -> Envelope

Check for a envelope on the in queue.

Returns:

the envelope object.

async_wait

 | async async_wait() -> None

Check for a envelope on the in queue.

OutBox Objects

class OutBox()

A queue from where you can only enqueue envelopes.

__init__

 | __init__(multiplexer: AsyncMultiplexer) -> None

Initialize the outbox.

Arguments:

  • multiplexer: the multiplexer

empty

 | empty() -> bool

Check for a envelope on the in queue.

Returns:

boolean indicating whether there is an envelope or not

put

 | put(envelope: Envelope) -> None

Put an envelope into the queue.

Arguments:

  • envelope: the envelope.

put_message

 | put_message(message: Message, context: Optional[EnvelopeContext] = None) -> None

Put a message in the outbox.

This constructs an envelope with the input arguments.

Arguments:

  • message: the message
  • context: the envelope context
Back to top