Skip to content

aea.skills.tasks

This module contains the classes for tasks.

Task Objects

class Task(WithLogger)

This class implements an abstract task.

__init__

 | __init__(**kwargs: Any) -> None

Initialize a task.

__call__

 | __call__(*args: Any, **kwargs: Any) -> Any

Execute the task.

Arguments:

  • args: positional arguments forwarded to the 'execute' method.
  • kwargs: keyword arguments forwarded to the 'execute' method.

Returns:

the task instance

Raises:

  • ValueError: if the task has already been executed.

is_executed

 | @property
 | is_executed() -> bool

Check if the task has already been executed.

result

 | @property
 | result() -> Any

Get the result.

Returns:

the result from the execute method.

Raises:

  • ValueError: if the task has not been executed yet.

setup

 | setup() -> None

Implement the task setup.

execute

 | @abstractmethod
 | execute(*args: Any, **kwargs: Any) -> Any

Run the task logic.

Arguments:

  • args: the positional arguments
  • kwargs: the keyword arguments

Returns:

any

teardown

 | teardown() -> None

Implement the task teardown.

init_worker

init_worker() -> None

Initialize a worker.

Disable the SIGINT handler of process pool is using. Related to a well-known bug: https://bugs.python.org/issue8296

TaskManager Objects

class TaskManager(WithLogger)

A Task manager.

__init__

 | __init__(nb_workers: int = DEFAULT_WORKERS_AMOUNT, is_lazy_pool_start: bool = True, logger: Optional[logging.Logger] = None, pool_mode: str = THREAD_POOL_MODE) -> None

Initialize the task manager.

Arguments:

  • nb_workers: the number of worker processes.
  • is_lazy_pool_start: option to postpone pool creation till the first enqueue_task called.
  • logger: the logger.
  • pool_mode: str. multithread or multiprocess

is_started

 | @property
 | is_started() -> bool

Get started status of TaskManager.

Returns:

bool

nb_workers

 | @property
 | nb_workers() -> int

Get the number of workers.

Returns:

int

enqueue_task

 | enqueue_task(func: Callable, args: Sequence = (), kwargs: Optional[Dict[str, Any]] = None) -> int

Enqueue a task with the executor.

Arguments:

  • func: the callable instance to be enqueued
  • args: the positional arguments to be passed to the function.
  • kwargs: the keyword arguments to be passed to the function.

Returns:

the task id to get the the result.

Raises:

  • ValueError: if the task manager is not running.

get_task_result

 | get_task_result(task_id: int) -> AsyncResult

Get the result from a task.

Arguments:

  • task_id: the task id

Returns:

async result for task_id

start

 | start() -> None

Start the task manager.

stop

 | stop() -> None

Stop the task manager.

ThreadedTaskManager Objects

class ThreadedTaskManager(TaskManager)

A threaded task manager.

__init__

 | __init__(nb_workers: int = DEFAULT_WORKERS_AMOUNT, is_lazy_pool_start: bool = True, logger: Optional[logging.Logger] = None) -> None

Initialize the task manager.

Arguments:

  • nb_workers: the number of worker processes.
  • is_lazy_pool_start: option to postpone pool creation till the first enqueue_task called.
  • logger: the logger.

ProcessTaskManager Objects

class ProcessTaskManager(TaskManager)

A multiprocess task manager.

__init__

 | __init__(nb_workers: int = DEFAULT_WORKERS_AMOUNT, is_lazy_pool_start: bool = True, logger: Optional[logging.Logger] = None) -> None

Initialize the task manager.

Arguments:

  • nb_workers: the number of worker processes.
  • is_lazy_pool_start: option to postpone pool creation till the first enqueue_task called.
  • logger: the logger.
Back to top