Skip to content

Create decision-maker transaction

This guide can be considered as a part 2 of the the stand-alone transaction demo. The main difference is that now we are going to use the decision-maker to sign the transaction.

First, import the libraries and the set the constant values. (Get the packages directory from the AEA repository svn export https://github.com/fetchai/agents-aea.git/trunk/packages.)

import logging
import time
from threading import Thread
from typing import Optional, cast

from aea_ledger_fetchai import FetchAICrypto

from aea.aea_builder import AEABuilder
from aea.configurations.base import PublicId, SkillConfig
from aea.crypto.helpers import create_private_key
from aea.crypto.ledger_apis import LedgerApis
from aea.crypto.wallet import Wallet
from aea.helpers.transaction.base import RawTransaction, Terms
from aea.identity.base import Identity
from aea.protocols.base import Address, Message
from aea.protocols.dialogue.base import Dialogue
from aea.skills.base import Handler, Model, Skill, SkillContext

from packages.fetchai.protocols.signing.dialogues import SigningDialogue
from packages.fetchai.protocols.signing.dialogues import (
    SigningDialogues as BaseSigningDialogues,
)
from packages.fetchai.protocols.signing.message import SigningMessage

from tests.conftest import get_wealth_if_needed


logger = logging.getLogger("aea")
logging.basicConfig(level=logging.INFO)

FETCHAI_PRIVATE_KEY_FILE_1 = "fetchai_private_key_1.txt"
FETCHAI_PRIVATE_KEY_FILE_2 = "fetchai_private_key_2.txt"

Create a private key and an AEA

To have access to the decision-maker, which is responsible for signing transactions, we need to create an AEA. We can create a an AEA with the builder, providing it with a private key we generate first.

    # Create a private key
    create_private_key(
        FetchAICrypto.identifier, private_key_file=FETCHAI_PRIVATE_KEY_FILE_1
    )

    # Instantiate the builder and build the AEA
    # By default, the default protocol, error skill and stub connection are added
    builder = AEABuilder()

    builder.set_name("my_aea")

    builder.add_private_key(FetchAICrypto.identifier, FETCHAI_PRIVATE_KEY_FILE_1)

    # Create our AEA
    my_aea = builder.build()

Add a simple skill

Add a simple skill with a signing handler and the signing dialogues.

    # add a simple skill with handler
    skill_context = SkillContext(my_aea.context)
    skill_config = SkillConfig(name="simple_skill", author="fetchai", version="0.1.0")
    signing_handler = SigningHandler(
        skill_context=skill_context, name="signing_handler"
    )
    signing_dialogues_model = SigningDialogues(
        skill_context=skill_context,
        name="signing_dialogues",
        self_address=str(skill_config.public_id),
    )

    simple_skill = Skill(
        skill_config,
        skill_context,
        handlers={signing_handler.name: signing_handler},
        models={signing_dialogues_model.name: signing_dialogues_model},
    )
    my_aea.resources.add_skill(simple_skill)

Create a second identity

    # create a second identity
    create_private_key(
        FetchAICrypto.identifier, private_key_file=FETCHAI_PRIVATE_KEY_FILE_2
    )

    counterparty_wallet = Wallet({FetchAICrypto.identifier: FETCHAI_PRIVATE_KEY_FILE_2})
    get_wealth_if_needed(counterparty_wallet.addresses["fetchai"])

    counterparty_identity = Identity(
        name="counterparty_aea",
        addresses=counterparty_wallet.addresses,
        public_keys=counterparty_wallet.public_keys,
        default_address_key=FetchAICrypto.identifier,
    )

Create the signing message

Next, we are creating the signing message and we send it to the decision-maker.

    # create signing message for decision maker to sign
    terms = Terms(
        ledger_id=FetchAICrypto.identifier,
        sender_address=my_aea.identity.address,
        counterparty_address=counterparty_identity.address,
        amount_by_currency_id={"FET": -1},
        quantities_by_good_id={"some_service": 1},
        nonce="some_nonce",
        fee_by_currency_id={"FET": 0},
    )
    get_wealth_if_needed(terms.sender_address)

    signing_dialogues = cast(SigningDialogues, skill_context.signing_dialogues)
    stub_transaction = LedgerApis.get_transfer_transaction(
        terms.ledger_id,
        terms.sender_address,
        terms.counterparty_address,
        terms.sender_payable_amount,
        terms.sender_fee,
        terms.nonce,
    )
    signing_msg = SigningMessage(
        performative=SigningMessage.Performative.SIGN_TRANSACTION,
        dialogue_reference=signing_dialogues.new_self_initiated_dialogue_reference(),
        raw_transaction=RawTransaction(FetchAICrypto.identifier, stub_transaction),
        terms=terms,
    )
    signing_dialogue = cast(
        Optional[SigningDialogue],
        signing_dialogues.create_with_message("decision_maker", signing_msg),
    )
    assert signing_dialogue is not None
    my_aea.context.decision_maker_message_queue.put_nowait(signing_msg)

Run the agent

Finally, we are running the agent and we expect the signed transaction to be printed in the terminal.

    # Set the AEA running in a different thread
    try:
        logger.info("STARTING AEA NOW!")
        t = Thread(target=my_aea.start)
        t.start()

        # Let it run long enough to interact with the decision maker
        time.sleep(1)
    finally:
        # Shut down the AEA
        logger.info("STOPPING AEA NOW!")
        my_aea.stop()
        t.join()

After the completion of the signing, we get the signed transaction.

More details

To be able to register a handler that reads the internal messages, we have to create a class at the end of the file which processes the signing messages.

class SigningDialogues(Model, BaseSigningDialogues):
    """Signing dialogues model."""

    def __init__(self, self_address: Address, **kwargs) -> None:
        """
        Initialize dialogues.

        :return: None
        """
        Model.__init__(self, **kwargs)

        def role_from_first_message(  # pylint: disable=unused-argument
            message: Message, receiver_address: Address
        ) -> Dialogue.Role:
            """Infer the role of the agent from an incoming/outgoing first message

            :param message: an incoming/outgoing first message
            :param receiver_address: the address of the receiving agent
            :return: The role of the agent
            """
            return SigningDialogue.Role.SKILL

        BaseSigningDialogues.__init__(
            self,
            self_address=self_address,
            role_from_first_message=role_from_first_message,
        )


class SigningHandler(Handler):
    """Implement the signing handler."""

    SUPPORTED_PROTOCOL = SigningMessage.protocol_id  # type: Optional[PublicId]

    def setup(self) -> None:
        """Implement the setup for the handler."""

    def handle(self, message: Message) -> None:
        """
        Implement the reaction to a message.

        :param message: the message
        :return: None
        """
        signing_msg = cast(SigningMessage, message)

        # recover dialogue
        signing_dialogues = cast(SigningDialogues, self.context.signing_dialogues)
        signing_dialogue = cast(
            Optional[SigningDialogue], signing_dialogues.update(signing_msg)
        )
        if signing_dialogue is None:
            self._handle_unidentified_dialogue(signing_msg)
            return

        # handle message
        if signing_msg.performative is SigningMessage.Performative.SIGNED_TRANSACTION:
            self._handle_signed_transaction(signing_msg, signing_dialogue)
        elif signing_msg.performative is SigningMessage.Performative.ERROR:
            self._handle_error(signing_msg, signing_dialogue)
        else:
            self._handle_invalid(signing_msg, signing_dialogue)

    def teardown(self) -> None:
        """
        Implement the handler teardown.

        :return: None
        """

    def _handle_unidentified_dialogue(self, signing_msg: SigningMessage) -> None:
        """
        Handle an unidentified dialogue.

        :param msg: the message
        """
        self.context.logger.info(
            "received invalid signing message={}, unidentified dialogue.".format(
                signing_msg
            )
        )

    def _handle_signed_transaction(
        self, signing_msg: SigningMessage, signing_dialogue: SigningDialogue
    ) -> None:
        """
        Handle a signing message.

        :param signing_msg: the signing message
        :param signing_dialogue: the dialogue
        :return: None
        """
        self.context.logger.info("transaction signing was successful.")
        logger.info(signing_msg.signed_transaction)

    def _handle_error(
        self, signing_msg: SigningMessage, signing_dialogue: SigningDialogue
    ) -> None:
        """
        Handle an oef search message.

        :param signing_msg: the signing message
        :param signing_dialogue: the dialogue
        :return: None
        """
        self.context.logger.info(
            "transaction signing was not successful. Error_code={} in dialogue={}".format(
                signing_msg.error_code, signing_dialogue
            )
        )

    def _handle_invalid(
        self, signing_msg: SigningMessage, signing_dialogue: SigningDialogue
    ) -> None:
        """
        Handle an oef search message.

        :param signing_msg: the signing message
        :param signing_dialogue: the dialogue
        :return: None
        """
        self.context.logger.warning(
            "cannot handle signing message of performative={} in dialogue={}.".format(
                signing_msg.performative, signing_dialogue
            )
        )

You can find the full code for this example below:

Transaction via decision-maker full code
import logging
import time
from threading import Thread
from typing import Optional, cast

from aea_ledger_fetchai import FetchAICrypto

from aea.aea_builder import AEABuilder
from aea.configurations.base import PublicId, SkillConfig
from aea.crypto.helpers import create_private_key
from aea.crypto.ledger_apis import LedgerApis
from aea.crypto.wallet import Wallet
from aea.helpers.transaction.base import RawTransaction, Terms
from aea.identity.base import Identity
from aea.protocols.base import Address, Message
from aea.protocols.dialogue.base import Dialogue
from aea.skills.base import Handler, Model, Skill, SkillContext

from packages.fetchai.protocols.signing.dialogues import SigningDialogue
from packages.fetchai.protocols.signing.dialogues import (
    SigningDialogues as BaseSigningDialogues,
)
from packages.fetchai.protocols.signing.message import SigningMessage

from tests.conftest import get_wealth_if_needed


logger = logging.getLogger("aea")
logging.basicConfig(level=logging.INFO)

FETCHAI_PRIVATE_KEY_FILE_1 = "fetchai_private_key_1.txt"
FETCHAI_PRIVATE_KEY_FILE_2 = "fetchai_private_key_2.txt"


def run():
    """Run demo."""

    # Create a private key
    create_private_key(
        FetchAICrypto.identifier, private_key_file=FETCHAI_PRIVATE_KEY_FILE_1
    )

    # Instantiate the builder and build the AEA
    # By default, the default protocol, error skill and stub connection are added
    builder = AEABuilder()

    builder.set_name("my_aea")

    builder.add_private_key(FetchAICrypto.identifier, FETCHAI_PRIVATE_KEY_FILE_1)

    # Create our AEA
    my_aea = builder.build()

    # add a simple skill with handler
    skill_context = SkillContext(my_aea.context)
    skill_config = SkillConfig(name="simple_skill", author="fetchai", version="0.1.0")
    signing_handler = SigningHandler(
        skill_context=skill_context, name="signing_handler"
    )
    signing_dialogues_model = SigningDialogues(
        skill_context=skill_context,
        name="signing_dialogues",
        self_address=str(skill_config.public_id),
    )

    simple_skill = Skill(
        skill_config,
        skill_context,
        handlers={signing_handler.name: signing_handler},
        models={signing_dialogues_model.name: signing_dialogues_model},
    )
    my_aea.resources.add_skill(simple_skill)

    # create a second identity
    create_private_key(
        FetchAICrypto.identifier, private_key_file=FETCHAI_PRIVATE_KEY_FILE_2
    )

    counterparty_wallet = Wallet({FetchAICrypto.identifier: FETCHAI_PRIVATE_KEY_FILE_2})
    get_wealth_if_needed(counterparty_wallet.addresses["fetchai"])

    counterparty_identity = Identity(
        name="counterparty_aea",
        addresses=counterparty_wallet.addresses,
        public_keys=counterparty_wallet.public_keys,
        default_address_key=FetchAICrypto.identifier,
    )

    # create signing message for decision maker to sign
    terms = Terms(
        ledger_id=FetchAICrypto.identifier,
        sender_address=my_aea.identity.address,
        counterparty_address=counterparty_identity.address,
        amount_by_currency_id={"FET": -1},
        quantities_by_good_id={"some_service": 1},
        nonce="some_nonce",
        fee_by_currency_id={"FET": 0},
    )
    get_wealth_if_needed(terms.sender_address)

    signing_dialogues = cast(SigningDialogues, skill_context.signing_dialogues)
    stub_transaction = LedgerApis.get_transfer_transaction(
        terms.ledger_id,
        terms.sender_address,
        terms.counterparty_address,
        terms.sender_payable_amount,
        terms.sender_fee,
        terms.nonce,
    )
    signing_msg = SigningMessage(
        performative=SigningMessage.Performative.SIGN_TRANSACTION,
        dialogue_reference=signing_dialogues.new_self_initiated_dialogue_reference(),
        raw_transaction=RawTransaction(FetchAICrypto.identifier, stub_transaction),
        terms=terms,
    )
    signing_dialogue = cast(
        Optional[SigningDialogue],
        signing_dialogues.create_with_message("decision_maker", signing_msg),
    )
    assert signing_dialogue is not None
    my_aea.context.decision_maker_message_queue.put_nowait(signing_msg)

    # Set the AEA running in a different thread
    try:
        logger.info("STARTING AEA NOW!")
        t = Thread(target=my_aea.start)
        t.start()

        # Let it run long enough to interact with the decision maker
        time.sleep(1)
    finally:
        # Shut down the AEA
        logger.info("STOPPING AEA NOW!")
        my_aea.stop()
        t.join()


class SigningDialogues(Model, BaseSigningDialogues):
    """Signing dialogues model."""

    def __init__(self, self_address: Address, **kwargs) -> None:
        """
        Initialize dialogues.

        :return: None
        """
        Model.__init__(self, **kwargs)

        def role_from_first_message(  # pylint: disable=unused-argument
            message: Message, receiver_address: Address
        ) -> Dialogue.Role:
            """Infer the role of the agent from an incoming/outgoing first message

            :param message: an incoming/outgoing first message
            :param receiver_address: the address of the receiving agent
            :return: The role of the agent
            """
            return SigningDialogue.Role.SKILL

        BaseSigningDialogues.__init__(
            self,
            self_address=self_address,
            role_from_first_message=role_from_first_message,
        )


class SigningHandler(Handler):
    """Implement the signing handler."""

    SUPPORTED_PROTOCOL = SigningMessage.protocol_id  # type: Optional[PublicId]

    def setup(self) -> None:
        """Implement the setup for the handler."""

    def handle(self, message: Message) -> None:
        """
        Implement the reaction to a message.

        :param message: the message
        :return: None
        """
        signing_msg = cast(SigningMessage, message)

        # recover dialogue
        signing_dialogues = cast(SigningDialogues, self.context.signing_dialogues)
        signing_dialogue = cast(
            Optional[SigningDialogue], signing_dialogues.update(signing_msg)
        )
        if signing_dialogue is None:
            self._handle_unidentified_dialogue(signing_msg)
            return

        # handle message
        if signing_msg.performative is SigningMessage.Performative.SIGNED_TRANSACTION:
            self._handle_signed_transaction(signing_msg, signing_dialogue)
        elif signing_msg.performative is SigningMessage.Performative.ERROR:
            self._handle_error(signing_msg, signing_dialogue)
        else:
            self._handle_invalid(signing_msg, signing_dialogue)

    def teardown(self) -> None:
        """
        Implement the handler teardown.

        :return: None
        """

    def _handle_unidentified_dialogue(self, signing_msg: SigningMessage) -> None:
        """
        Handle an unidentified dialogue.

        :param msg: the message
        """
        self.context.logger.info(
            "received invalid signing message={}, unidentified dialogue.".format(
                signing_msg
            )
        )

    def _handle_signed_transaction(
        self, signing_msg: SigningMessage, signing_dialogue: SigningDialogue
    ) -> None:
        """
        Handle a signing message.

        :param signing_msg: the signing message
        :param signing_dialogue: the dialogue
        :return: None
        """
        self.context.logger.info("transaction signing was successful.")
        logger.info(signing_msg.signed_transaction)

    def _handle_error(
        self, signing_msg: SigningMessage, signing_dialogue: SigningDialogue
    ) -> None:
        """
        Handle an oef search message.

        :param signing_msg: the signing message
        :param signing_dialogue: the dialogue
        :return: None
        """
        self.context.logger.info(
            "transaction signing was not successful. Error_code={} in dialogue={}".format(
                signing_msg.error_code, signing_dialogue
            )
        )

    def _handle_invalid(
        self, signing_msg: SigningMessage, signing_dialogue: SigningDialogue
    ) -> None:
        """
        Handle an oef search message.

        :param signing_msg: the signing message
        :param signing_dialogue: the dialogue
        :return: None
        """
        self.context.logger.warning(
            "cannot handle signing message of performative={} in dialogue={}.".format(
                signing_msg.performative, signing_dialogue
            )
        )


if __name__ == "__main__":
    run()
Back to top