Prometheus monitoring
AEAs can create and update prometheus metrics for remote monitoring by sending messages to the prometheus connection fetchai/prometheus:0.9.0
.
To see this working in an agent, fetch and run the coin_price_feed
agent and check localhost:9090/metrics
to see the latest values of the metrics num_retrievals
and num_requests
:
To use this connection, add a model prometheus_dialogues
to your skill to handle the metrics configuration and messages to the prometheus connection.
Click here for example
class PrometheusDialogues(Model, BasePrometheusDialogues):
"""The dialogues class keeps track of all prometheus dialogues."""
def __init__(self, **kwargs) -> None:
"""
Initialize dialogues.
:return: None
"""
self.enabled = kwargs.pop("enabled", False)
self.metrics = kwargs.pop("metrics", [])
Model.__init__(self, **kwargs)
def role_from_first_message( # pylint: disable=unused-argument
message: Message, receiver_address: Address
) -> BaseDialogue.Role:
"""Infer the role of the agent from an incoming/outgoing first message
:param message: an incoming/outgoing first message
:param receiver_address: the address of the receiving agent
:return: The role of the agent
"""
return PrometheusDialogue.Role.AGENT
BasePrometheusDialogues.__init__(
self,
self_address=str(self.skill_id),
role_from_first_message=role_from_first_message,
)
Then configure your metrics in the skill.yaml
file. For example (from the advanced_data_request
skill):
models:
prometheus_dialogues:
args:
enabled: true
metrics:
- name: num_retrievals
type: Gauge
description: Number of price quotes retrieved
labels: {}
- name: num_requests
type: Gauge
description: Number of price quote requests served
labels: {}
class_name: PrometheusDialogues
Add a metric metric_name
of type metric_type
{Gauge
, Counter
, ...} and description description
by sending a message with performative ADD_METRIC
to the prometheus connection:
def add_prometheus_metric(
self,
metric_name: str,
metric_type: str,
description: str,
labels: Dict[str, str],
) -> None:
"""
Add a prometheus metric.
:param metric_name: the name of the metric to add.
:param type: the type of the metric.
:param description: a description of the metric.
:param labels: the metric labels.
:return: None
"""
# context
prom_dialogues = cast(PrometheusDialogues, self.context.prometheus_dialogues)
# prometheus update message
message, _ = prom_dialogues.create(
counterparty=str(PROM_CONNECTION_ID),
performative=PrometheusMessage.Performative.ADD_METRIC,
type=metric_type,
title=metric_name,
description=description,
labels=labels,
)
# send message
self.context.outbox.put_message(message=message)
PROM_CONNECTION_ID
should be imported to your skill as follows:
Update metric metric_name
with update function update_func
{inc
, set
, observe
, ...} and value value
by sending a message with performative UPDATE_METRIC
to the prometheus connection:
def update_prometheus_metric(
self, metric_name: str, update_func: str, value: float, labels: Dict[str, str],
) -> None:
"""
Update a prometheus metric.
:param metric_name: the name of the metric.
:param update_func: the name of the update function (e.g. inc, dec, set, ...).
:param value: the value to provide to the update function.
:param labels: the metric labels.
:return: None
"""
# context
prom_dialogues = cast(PrometheusDialogues, self.context.prometheus_dialogues)
# prometheus update message
message, _ = prom_dialogues.create(
counterparty=str(PROM_CONNECTION_ID),
performative=PrometheusMessage.Performative.UPDATE_METRIC,
title=metric_name,
callable=update_func,
value=value,
labels=labels,
)
# send message
self.context.outbox.put_message(message=message)
Initialize the metrics from the configuration file in the behaviour setup:
def setup(self) -> None:
"""Implement the setup of the behaviour"""
prom_dialogues = cast(PrometheusDialogues, self.context.prometheus_dialogues)
if prom_dialogues.enabled:
for metric in prom_dialogues.metrics:
self.context.logger.info("Adding Prometheus metric: " + metric["name"])
self.add_prometheus_metric(
metric["name"], metric["type"], metric["description"], dict(metric["labels"]),
Then call the update_prometheus_metric
function from the appropriate places.
For example, the following code in handlers.py
for the advanced_data_request
skill updates the number of http requests served:
if self.context.prometheus_dialogues.enabled:
self.context.behaviours.advanced_data_request_behaviour.update_prometheus_metric(
"num_requests", "inc", 1.0, {}
)
Finally, you can add a PrometheusHandler
to your skill to process response messages from the prometheus connection.
Click here for example
class PrometheusHandler(Handler):
"""This class handles responses from the prometheus server."""
SUPPORTED_PROTOCOL = PrometheusMessage.protocol_id
def __init__(self, **kwargs):
"""Initialize the handler."""
super().__init__(**kwargs)
self.handled_message = None
def setup(self) -> None:
"""Set up the handler."""
if self.context.prometheus_dialogues.enabled:
self.context.logger.info("setting up PrometheusHandler")
def handle(self, message: Message) -> None:
"""
Implement the reaction to a message.
:param message: the message
:return: None
"""
message = cast(PrometheusMessage, message)
# recover dialogue
prometheus_dialogues = cast(
PrometheusDialogues, self.context.prometheus_dialogues
)
prometheus_dialogue = cast(
PrometheusDialogue, prometheus_dialogues.update(message)
)
if prometheus_dialogue is None:
self._handle_unidentified_dialogue(message)
return
self.handled_message = message
if message.performative == PrometheusMessage.Performative.RESPONSE:
self.context.logger.debug(
f"Prometheus response ({message.code}): {message.message}"
)
else:
self.context.logger.debug(
f"got unexpected prometheus message: Performative = {PrometheusMessage.Performative}"
)
def _handle_unidentified_dialogue(self, msg: Message) -> None:
"""
Handle an unidentified dialogue.
:param msg: the unidentified message to be handled
:return: None
"""
self.context.logger.info(
"received invalid message={}, unidentified dialogue.".format(msg)
)
def teardown(self) -> None:
"""
Teardown the handler.
:return: None
"""