diff --git a/.github/workflows/workflow.yml b/.github/workflows/workflow.yml
index 925ce42f97..d24e452db8 100644
--- a/.github/workflows/workflow.yml
+++ b/.github/workflows/workflow.yml
@@ -53,6 +53,8 @@ jobs:
run: |
tox -e black-check
tox -e flake8
+ - name: Unused code check
+ run: tox -e vulture
- name: Static type check
run: tox -e mypy
- name: Golang code style check
@@ -97,7 +99,7 @@ jobs:
pip install tox
# install IPFS
sudo apt-get install -y wget
- wget -O ./go-ipfs.tar.gz https://dist.ipfs.io/go-ipfs/v0.4.23/go-ipfs_v0.4.23_linux-amd64.tar.gz
+ wget -O ./go-ipfs.tar.gz https://dist.ipfs.io/go-ipfs/v0.6.0/go-ipfs_v0.6.0_linux-amd64.tar.gz
tar xvfz go-ipfs.tar.gz
sudo mv go-ipfs/ipfs /usr/local/bin/ipfs
ipfs init
@@ -140,7 +142,7 @@ jobs:
sudo apt-get install -y protobuf-compiler
- name: Sync AEA loop integration tests
run: |
- tox -e py3.8 -- --aea-loop sync -m 'sync'
+ tox -e py3.8 -- -m 'sync' # --aea-loop sync
- name: Async integration tests
run: tox -e py3.8 -- -m 'integration and not unstable and not ledger'
@@ -241,7 +243,7 @@ jobs:
sudo apt-get autoclean
pip install tox
sudo apt-get install -y protobuf-compiler
- - name: Unit tests
+ - name: Unit tests with sync agent loop
run: |
tox -e py3.8 -- --aea-loop sync -m 'not integration and not unstable'
@@ -277,7 +279,7 @@ jobs:
- integration_checks
- integration_checks_ledger
- platform_checks
- - platform_checks_sync_aea_loop
+# - platform_checks_sync_aea_loop
runs-on: ubuntu-latest
timeout-minutes: 60
steps:
diff --git a/.pylintrc b/.pylintrc
index 3465e8ea7d..f289c246a0 100644
--- a/.pylintrc
+++ b/.pylintrc
@@ -1,68 +1,34 @@
[MASTER]
-ignore-patterns=serialization.py,message.py,__main__.py,.*_pb2.py,launch.py,transaction.py
+ignore-patterns=serialization.py,message.py,__main__.py,.*_pb2.py
[MESSAGES CONTROL]
-disable=C0103,C0201,C0330,C0301,C0302,W1202,W1203,W0511,W0107,W0105,W0621,W0235,W0613,W0221,R0902,R0913,R0914,R1720,R1705,R0801,R0904,R0903,R0911,R0912,R0901,R1704,R0916,R1702,R0915,R1710,R1703,R0401
+disable=C0103,C0201,C0301,C0302,C0330,W0105,W0107,W1202,W1203,R0902,R0913,R0914,R0801,R0904,R0903,R0911,R0912,R0901,R0916,R1702,R0915
-ENABLED:
-# W0703: broad-except
-# W0212: protected-access
-# W0706: try-except-raise
-# W0108: unnecessary-lambda
-# W0622: redefined-builtin
-# W0163: unused-argument
-# W0201: attribute-defined-outside-init
-# W0222: signature-differs
-# W0223: abstract-method
-# W0611: unused-import
-# W0612: unused-variable
-# W1505: deprecated-method
-# W0106: expression-not-assigned
-# R0201: no-self-use
-# R0205: useless-object-inheritance
-# R1723: no-else-break
-# R1721: unnecessary-comprehension
-# R1718: consider-using-set-comprehension
-# R1716: chained-comparison
-# R1714: consider-using-in
-# R0123: literal-comparison
-# R1711: useless-return
-# R1722: consider-using-sys-exit
-
-## Resolve these:
-# R0401: cyclic-import
-# W0221: arguments-differ
+## Eventually resolve these:
# R0902: too-many-instance-attributes
# R0913: too-many-arguments
# R0914: too-many-locals
-# R1720: no-else-raise
-# R1705: no-else-return
# R0904: too-many-public-methods
# R0903: too-few-public-methods
# R0911: too-many-return-statements
# R0912: too-many-branches
# R0901: too-many-ancestors
-# R1704: redefined-argument-from-local
# R0916: too-many-boolean-expressions
# R1702: too-many-nested-blocks
# R0915: too-many-statements
-# R1710: inconsistent-return-statements
-# R1703: simplifiable-if-statement
+# decide on a logging policy:
+# W1202: logging-format-interpolation
+# W1203: logging-fstring-interpolation
## Keep the following:
# C0103: invalid-name
# C0201: consider-iterating-dictionary
-# C0330: Wrong haning indentation
-# http://pylint-messages.wikidot.com/messages:c0301 > Line too long (%s/%s)
-# http://pylint-messages.wikidot.com/messages:c0302 > Too many lines in module (%s)
-# W1202: logging-format-interpolation
-# W1203: logging-fstring-interpolation
-# W0511: fixme
-# W0107: unnecessary-pass
-# W0105: pointless-string-statement
-# W0621: redefined-outer-name
-# W0235: useless-super-delegation
-# R0801: similar lines
+# C0301: http://pylint-messages.wikidot.com/messages:c0301 > Line too long (%s/%s)
+# C0302: http://pylint-messages.wikidot.com/messages:c0302 > Too many lines in module (%s)
+# C0330: Wrong hanging indentation
+# W0105: pointless-string-statement, # kept as no harm
+# W0107: unnecessary-pass, # kept as no harm
+# R0801: similar lines, # too granular
[IMPORTS]
ignored-modules=aiohttp,defusedxml,gym,fetch,matplotlib,memory_profiler,numpy,oef,openapi_core,psutil,tensorflow,temper,skimage,vyper,web3
diff --git a/AUTHORS.md b/AUTHORS.md
index 7f0cd1e391..e6ed8ddf47 100644
--- a/AUTHORS.md
+++ b/AUTHORS.md
@@ -12,3 +12,4 @@ This is the official list of Fetch.AI authors for copyright purposes.
* Yuri Turchenkov
diff --git a/aea/__version__.py b/aea/__version__.py
index 3891260ff4..b16a79a5d0 100644
--- a/aea/__version__.py
+++ b/aea/__version__.py
@@ -22,7 +22,7 @@
__title__ = "aea"
__description__ = "Autonomous Economic Agent framework"
__url__ = "https://github.com/fetchai/agents-aea.git"
-__version__ = "0.5.4"
+__version__ = "0.6.0"
__author__ = "Fetch.AI Limited"
__license__ = "Apache-2.0"
__copyright__ = "2019 Fetch.AI Limited"
diff --git a/aea/abstract_agent.py b/aea/abstract_agent.py
new file mode 100644
index 0000000000..cb3e617dc1
--- /dev/null
+++ b/aea/abstract_agent.py
@@ -0,0 +1,125 @@
+# -*- coding: utf-8 -*-
+# ------------------------------------------------------------------------------
+#
+# Copyright 2018-2019 Fetch.AI Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ------------------------------------------------------------------------------
+"""This module contains the interface definition of the abstract agent."""
+import datetime
+from abc import ABC, abstractmethod, abstractproperty
+from typing import Any, Callable, Dict, List, Optional, Tuple
+
+from aea.connections.base import Connection
+from aea.mail.base import Envelope
+
+
+class AbstractAgent(ABC):
+ """This class provides an abstract base interface for an agent."""
+
+ @abstractproperty
+ def name(self) -> str:
+ """Get agent's name."""
+
+ @abstractmethod
+ def start(self) -> None:
+ """
+ Start the agent.
+
+ :return: None
+ """
+
+ @abstractmethod
+ def stop(self) -> None:
+ """
+ Stop the agent.
+
+ :return: None
+ """
+
+ @abstractmethod
+ def setup(self) -> None:
+ """
+ Set up the agent.
+
+ :return: None
+ """
+
+ @abstractmethod
+ def act(self) -> None:
+ """
+ Perform actions on period.
+
+ :return: None
+ """
+
+ @abstractmethod
+ def handle_envelope(self, envelope: Envelope) -> None:
+ """
+ Handle an envelope.
+
+ :param envelope: the envelope to handle.
+ :return: None
+ """
+
+ @abstractmethod
+ def get_periodic_tasks(
+ self,
+ ) -> Dict[Callable, Tuple[float, Optional[datetime.datetime]]]:
+ """
+ Get all periodic tasks for agent.
+
+ :return: dict of callable with period specified
+ """
+
+ @abstractmethod
+ def get_message_handlers(self) -> List[Tuple[Callable[[Any], None], Callable]]:
+ """
+ Get handlers with message getters.
+
+ :return: List of tuples of callables: handler and coroutine to get a message
+ """
+
+ @abstractmethod
+ def get_multiplexer_setup_options(self) -> Optional[Dict]:
+ """
+ Get options to pass to Multiplexer.setup.
+
+ :return: dict of kwargs
+ """
+
+ @abstractproperty
+ def connections(self) -> List[Connection]:
+ """Return list of connections."""
+
+ @abstractmethod
+ def exception_handler(
+ self, exception: Exception, function: Callable
+ ) -> Optional[bool]:
+ """
+ Handle exception raised during agent main loop execution.
+
+ :param exception: exception raised
+ :param function: a callable exception raised in.
+
+ :return: skip exception if True, otherwise re-raise it
+ """
+
+ @abstractmethod
+ def teardown(self) -> None:
+ """
+ Tear down the agent.
+
+ :return: None
+ """
diff --git a/aea/aea.py b/aea/aea.py
index fdb96bcd82..181a234a39 100644
--- a/aea/aea.py
+++ b/aea/aea.py
@@ -16,24 +16,38 @@
# limitations under the License.
#
# ------------------------------------------------------------------------------
+
"""This module contains the implementation of an autonomous economic agent (AEA)."""
+import datetime
import logging
from asyncio import AbstractEventLoop
-from typing import Any, Callable, Collection, Dict, List, Optional, Sequence, Type, cast
+from multiprocessing.pool import AsyncResult
+from typing import (
+ Any,
+ Callable,
+ Collection,
+ Dict,
+ List,
+ Optional,
+ Sequence,
+ Tuple,
+ Type,
+ cast,
+)
from aea.agent import Agent
from aea.agent_loop import AsyncAgentLoop, BaseAgentLoop, SyncAgentLoop
from aea.configurations.base import PublicId
from aea.configurations.constants import DEFAULT_SKILL
+from aea.connections.base import Connection
from aea.context.base import AgentContext
from aea.crypto.wallet import Wallet
-from aea.decision_maker.base import DecisionMaker, DecisionMakerHandler
+from aea.decision_maker.base import DecisionMakerHandler
from aea.decision_maker.default import (
DecisionMakerHandler as DefaultDecisionMakerHandler,
)
from aea.exceptions import AEAException
from aea.helpers.exception_policy import ExceptionPolicyEnum
-from aea.helpers.exec_timeout import ExecTimeoutThreadGuard, TimeoutException
from aea.helpers.logging import AgentLoggerAdapter, WithLogger
from aea.identity.base import Identity
from aea.mail.base import Envelope
@@ -41,17 +55,16 @@
from aea.protocols.default.message import DefaultMessage
from aea.registries.filter import Filter
from aea.registries.resources import Resources
-from aea.skills.base import Behaviour, Handler, SkillComponent
+from aea.skills.base import Behaviour, Handler
from aea.skills.error.handlers import ErrorHandler
-from aea.skills.tasks import TaskManager
class AEA(Agent, WithLogger):
"""This class implements an autonomous economic agent."""
RUN_LOOPS: Dict[str, Type[BaseAgentLoop]] = {
- "sync": SyncAgentLoop,
"async": AsyncAgentLoop,
+ "sync": SyncAgentLoop,
}
DEFAULT_RUN_LOOP: str = "async"
@@ -61,7 +74,7 @@ def __init__(
wallet: Wallet,
resources: Resources,
loop: Optional[AbstractEventLoop] = None,
- timeout: float = 0.05,
+ period: float = 0.05,
execution_timeout: float = 0,
max_reactions: int = 20,
decision_maker_handler_class: Type[
@@ -83,7 +96,7 @@ def __init__(
:param wallet: the wallet of the agent.
:param resources: the resources (protocols and skills) of the agent.
:param loop: the event loop to run the connections.
- :param timeout: the time in (fractions of) seconds to time out an agent between act and react
+ :param period: period to call agent's act
:param exeution_timeout: amount of time to limit single act/handle to execute.
:param max_reactions: the processing rate of envelopes per tick (i.e. single loop).
:param decision_maker_handler_class: the class implementing the decision maker handler to be used.
@@ -102,7 +115,7 @@ def __init__(
identity=identity,
connections=[],
loop=loop,
- timeout=timeout,
+ period=period,
loop_mode=loop_mode,
runtime_mode=runtime_mode,
)
@@ -112,20 +125,18 @@ def __init__(
WithLogger.__init__(self, logger=cast(logging.Logger, aea_logger))
self.max_reactions = max_reactions
- self._task_manager = TaskManager()
decision_maker_handler = decision_maker_handler_class(
identity=identity, wallet=wallet
)
- self._decision_maker = DecisionMaker(
- decision_maker_handler=decision_maker_handler
- )
+ self.runtime.set_decision_maker(decision_maker_handler)
+
self._context = AgentContext(
self.identity,
- self.multiplexer.connection_status,
+ self.runtime.multiplexer.connection_status,
self.outbox,
- self.decision_maker.message_in_queue,
+ self.runtime.decision_maker.message_in_queue,
decision_maker_handler.context,
- self.task_manager,
+ self.runtime.task_manager,
default_connection,
default_routing if default_routing is not None else {},
search_service_address,
@@ -134,17 +145,14 @@ def __init__(
self._execution_timeout = execution_timeout
self._connection_ids = connection_ids
self._resources = resources
- self._filter = Filter(self.resources, self.decision_maker.message_out_queue)
+ self._filter = Filter(
+ self.resources, self.runtime.decision_maker.message_out_queue
+ )
self._skills_exception_policy = skill_exception_policy
self._setup_loggers()
- @property
- def decision_maker(self) -> DecisionMaker:
- """Get decision maker."""
- return self._decision_maker
-
@property
def context(self) -> AgentContext:
"""Get (agent) context."""
@@ -160,24 +168,6 @@ def resources(self, resources: "Resources") -> None:
"""Set resources."""
self._resources = resources
- @property
- def task_manager(self) -> TaskManager:
- """Get the task manager."""
- return self._task_manager
-
- def setup_multiplexer(self) -> None:
- """Set up the multiplexer."""
- connections = self.resources.get_all_connections()
- if self._connection_ids is not None:
- connections = [
- c for c in connections if c.connection_id in self._connection_ids
- ]
- self.multiplexer.setup(
- connections,
- default_routing=self.context.default_routing,
- default_connection=self.context.default_connection,
- )
-
@property
def filter(self) -> Filter:
"""Get the filter."""
@@ -195,16 +185,11 @@ def setup(self) -> None:
Performs the following:
- loads the resources (unless in programmatic mode)
- - starts the task manager
- - starts the decision maker
- calls setup() on the resources
:return: None
"""
- self.task_manager.start()
- self.decision_maker.start()
self.resources.setup()
- ExecTimeoutThreadGuard.start()
def act(self) -> None:
"""
@@ -214,79 +199,62 @@ def act(self) -> None:
:return: None
"""
- for behaviour in self.active_behaviours:
- self._behaviour_act(behaviour)
-
- def react(self) -> None:
- """
- React to incoming envelopes.
-
- Gets up to max_reactions number of envelopes from the inbox and
- handles each envelope, which entailes:
+ self.filter.handle_new_handlers_and_behaviours()
- - fetching the protocol referenced by the envelope, and
- - returning an envelope to sender if the protocol is unsupported, using the error handler, or
- - returning an envelope to sender if there is a decoding error, using the error handler, or
- - returning an envelope to sender if no active handler is available for the specified protocol, using the error handler, or
- - handling the message recovered from the envelope with all active handlers for the specified protocol.
-
- :return: None
- """
- counter = 0
- while not self.inbox.empty() and counter < self.max_reactions:
- counter += 1
- self._react_one()
+ @property
+ def active_connections(self) -> List[Connection]:
+ """Return list of active connections."""
+ connections = self.resources.get_all_connections()
+ if self._connection_ids is not None:
+ connections = [
+ c for c in connections if c.connection_id in self._connection_ids
+ ]
+ return connections
- def _react_one(self) -> None:
+ def get_multiplexer_setup_options(self) -> Optional[Dict]:
"""
- Get and process one envelop from inbox.
+ Get options to pass to Multiplexer.setup.
- :return: None
+ :return: dict of kwargs
"""
- envelope = self.inbox.get_nowait() # type: Optional[Envelope]
- if envelope is not None:
- self._handle(envelope)
+ return dict(
+ connections=self.active_connections,
+ default_routing=self.context.default_routing,
+ default_connection=self.context.default_connection,
+ )
def _get_error_handler(self) -> Optional[Handler]:
- """Get error hadnler."""
+ """Get error handler."""
return self.resources.get_handler(DefaultMessage.protocol_id, DEFAULT_SKILL)
- def _handle(self, envelope: Envelope) -> None:
- """
- Handle an envelope.
-
- :param envelope: the envelope to handle.
- :return: None
- """
- self.logger.debug("Handling envelope: {}".format(envelope))
+ def _get_msg_and_handlers_for_envelope(
+ self, envelope: Envelope
+ ) -> Tuple[Optional[Message], List[Handler]]:
protocol = self.resources.get_protocol(envelope.protocol_id)
- # TODO specify error handler in config and make this work for different skill/protocol versions.
error_handler = self._get_error_handler()
if error_handler is None:
self.logger.warning("ErrorHandler not initialized. Stopping AEA!")
self.stop()
- return
+ return None, []
error_handler = cast(ErrorHandler, error_handler)
if protocol is None:
error_handler.send_unsupported_protocol(envelope)
- return
+ return None, []
- try:
- if isinstance(envelope.message, Message):
- msg = envelope.message
- else:
+ if isinstance(envelope.message, Message):
+ msg = envelope.message
+ else:
+ try:
msg = protocol.serializer.decode(envelope.message)
- msg.counterparty = envelope.sender
- msg.sender = envelope.sender
- # msg.to = envelope.to
- msg.is_incoming = True
- except Exception as e: # pylint: disable=broad-except # thats ok, because we send the decoding error back
- self.logger.warning("Decoding error. Exception: {}".format(str(e)))
- error_handler.send_decoding_error(envelope)
- return
+ msg.sender = envelope.sender
+ msg.to = envelope.to
+ except Exception as e: # pylint: disable=broad-except # thats ok, because we send the decoding error back
+ self.logger.warning("Decoding error. Exception: {}".format(str(e)))
+ error_handler.send_decoding_error(envelope)
+ return None, []
handlers = self.filter.get_active_handlers(
protocol.public_id, envelope.skill_id
@@ -294,86 +262,114 @@ def _handle(self, envelope: Envelope) -> None:
if len(handlers) == 0:
error_handler.send_unsupported_skill(envelope)
- return
+ return None, []
- for handler in handlers:
- self._handle_message_with_handler(msg, handler)
+ return msg, handlers
- def _handle_message_with_handler(self, message: Message, handler: Handler) -> None:
+ def handle_envelope(self, envelope: Envelope) -> None:
"""
- Handle one message with one predefined handler.
+ Handle an envelope.
+
+ - fetching the protocol referenced by the envelope, and
+ - returning an envelope to sender if the protocol is unsupported, using the error handler, or
+ - returning an envelope to sender if there is a decoding error, using the error handler, or
+ - returning an envelope to sender if no active handler is available for the specified protocol, using the error handler, or
+ - handling the message recovered from the envelope with all active handlers for the specified protocol.
- :param message: message to be handled.
- :param handler: handler suitable for this message protocol.
+ :param envelope: the envelope to handle.
+ :return: None
"""
- self._execution_control(handler.handle, handler, [message])
+ self.logger.debug("Handling envelope: {}".format(envelope))
+ msg, handlers = self._get_msg_and_handlers_for_envelope(envelope)
- def _behaviour_act(self, behaviour: Behaviour) -> None:
+ if msg is None:
+ return
+
+ for handler in handlers:
+ handler.handle(msg)
+
+ def _setup_loggers(self):
+ """Set up logger with agent name."""
+ for element in [
+ self.runtime.main_loop,
+ self.runtime.multiplexer,
+ self.runtime.task_manager,
+ self.resources.component_registry,
+ self.resources.behaviour_registry,
+ self.resources.handler_registry,
+ self.resources.model_registry,
+ ]:
+ element.logger = AgentLoggerAdapter(
+ element.logger, agent_name=self._identity.name
+ )
+
+ def get_periodic_tasks(
+ self,
+ ) -> Dict[Callable, Tuple[float, Optional[datetime.datetime]]]:
"""
- Call behaviour's act.
+ Get all periodic tasks for agent.
- :param behaviour: behaviour already defined
- :return: None
+ :return: dict of callable with period specified
"""
- self._execution_control(behaviour.act_wrapper, behaviour)
+ tasks = super().get_periodic_tasks()
+ tasks.update(self._get_behaviours_tasks())
+ return tasks
- def _execution_control(
+ def _get_behaviours_tasks(
self,
- fn: Callable,
- component: SkillComponent,
- args: Optional[Sequence] = None,
- kwargs: Optional[Dict] = None,
- ) -> Any:
+ ) -> Dict[Callable, Tuple[float, Optional[datetime.datetime]]]:
"""
- Execute skill function in exception handling environment.
+ Get all periodic tasks for AEA behaviours.
- Logs error, stop agent or propagate excepion depends on policy defined.
+ :return: dict of callable with period specified
+ """
+ tasks = {}
- :param fn: function to call
- :param component: skill component function belongs to
- :param args: optional sequence of arguments to pass to function on call
- :param kwargs: optional dict of keyword arguments to pass to function on call
+ for behaviour in self.active_behaviours:
+ tasks[behaviour.act_wrapper] = (behaviour.tick_interval, behaviour.start_at)
+
+ return tasks
- :return: same as function
+ def get_message_handlers(self) -> List[Tuple[Callable[[Any], None], Callable]]:
"""
- # docstyle: ignore
- def log_exception(e, fn, component):
- self.logger.exception(f"<{e}> raised during `{fn}` call of `{component}`")
-
- try:
- with ExecTimeoutThreadGuard(self._execution_timeout):
- return fn(*(args or []), **(kwargs or {}))
- except TimeoutException:
- self.logger.warning(
- "`{}` of `{}` was terminated as its execution exceeded the timeout of {} seconds. Please refactor your code!".format(
- fn, component, self._execution_timeout
- )
- )
- except Exception as e: # pylint: disable=broad-except
- if self._skills_exception_policy == ExceptionPolicyEnum.propagate:
- raise
- elif self._skills_exception_policy == ExceptionPolicyEnum.just_log:
- log_exception(e, fn, component)
- elif self._skills_exception_policy == ExceptionPolicyEnum.stop_and_exit:
- log_exception(e, fn, component)
- self.stop()
- raise AEAException(
- f"AEA was terminated cause exception `{e}` in skills {component} {fn}! Please check logs."
- )
- else:
- raise AEAException(
- f"Unsupported exception policy: {self._skills_exception_policy}"
- )
-
- def update(self) -> None:
+ Get handlers with message getters.
+
+ :return: List of tuples of callables: handler and coroutine to get a message
+ """
+ return super(AEA, self).get_message_handlers() + [
+ (self.filter.handle_internal_message, self.filter.get_internal_message,),
+ ]
+
+ def exception_handler(self, exception: Exception, function: Callable) -> bool:
"""
- Update the current state of the agent.
+ Handle exception raised during agent main loop execution.
- Handles the internal messages from the skills to the decision maker.
+ :param exception: exception raised
+ :param function: a callable exception raised in.
- :return None
+ :return: bool, propagate exception if True otherwise skip it.
"""
- self.filter.handle_internal_messages() # pragma: nocover
+ # docstyle: ignore # noqa: E800
+ def log_exception(e, fn):
+ self.logger.exception(f"<{e}> raised during `{fn}`")
+
+ if self._skills_exception_policy == ExceptionPolicyEnum.propagate:
+ return True
+
+ if self._skills_exception_policy == ExceptionPolicyEnum.stop_and_exit:
+ log_exception(exception, function)
+ self.stop()
+ raise AEAException(
+ f"AEA was terminated cause exception `{exception}` in skills {function}! Please check logs."
+ )
+
+ if self._skills_exception_policy == ExceptionPolicyEnum.just_log:
+ log_exception(exception, function)
+ return False
+
+ raise AEAException(
+ f"Unsupported exception policy: {self._skills_exception_policy}"
+ )
def teardown(self) -> None:
"""
@@ -381,29 +377,31 @@ def teardown(self) -> None:
Performs the following:
- - stops the decision maker
- - stops the task manager
- tears down the resources.
:return: None
"""
self.logger.debug("[{}]: Calling teardown method...".format(self.name))
- self.decision_maker.stop()
- self.task_manager.stop()
self.resources.teardown()
- ExecTimeoutThreadGuard.stop()
- def _setup_loggers(self):
- """Set up logger with agent name."""
- for element in [
- self.main_loop,
- self.multiplexer,
- self.task_manager,
- self.resources.component_registry,
- self.resources.behaviour_registry,
- self.resources.handler_registry,
- self.resources.model_registry,
- ]:
- element.logger = AgentLoggerAdapter(
- element.logger, agent_name=self._identity.name
- )
+ def get_task_result(self, task_id: int) -> AsyncResult:
+ """
+ Get the result from a task.
+
+ :return: async result for task_id
+ """
+ return self.runtime.task_manager.get_task_result(task_id)
+
+ def enqueue_task(
+ self, func: Callable, args: Sequence = (), kwds: Optional[Dict[str, Any]] = None
+ ) -> int:
+ """
+ Enqueue a task with the task manager.
+
+ :param func: the callable instance to be enqueued
+ :param args: the positional arguments to be passed to the function.
+ :param kwds: the keyword arguments to be passed to the function.
+ :return the task id to get the the result.
+ :raises ValueError: if the task manager is not running.
+ """
+ return self.runtime.task_manager.enqueue_task(func, args, kwds)
diff --git a/aea/aea_builder.py b/aea/aea_builder.py
index a7fa324122..01e3cf3ca1 100644
--- a/aea/aea_builder.py
+++ b/aea/aea_builder.py
@@ -47,7 +47,7 @@
from aea import AEA_DIR
from aea.aea import AEA
-from aea.components.base import Component
+from aea.components.base import Component, load_aea_package
from aea.components.loader import load_component_from_config
from aea.configurations.base import (
AgentConfig,
@@ -69,8 +69,8 @@
DEFAULT_PROTOCOL,
DEFAULT_SKILL,
)
-from aea.configurations.loader import ConfigLoader
-from aea.contracts import contract_registry
+from aea.configurations.loader import ConfigLoader, load_component_configuration
+from aea.configurations.pypi import is_satisfiable, merge_dependencies
from aea.crypto.helpers import verify_or_create_private_keys
from aea.crypto.wallet import Wallet
from aea.decision_maker.base import DecisionMakerHandler
@@ -78,11 +78,9 @@
DecisionMakerHandler as DefaultDecisionMakerHandler,
)
from aea.exceptions import AEAException
-from aea.helpers.base import load_aea_package, load_module
+from aea.helpers.base import load_module
from aea.helpers.exception_policy import ExceptionPolicyEnum
from aea.helpers.logging import AgentLoggerAdapter
-from aea.helpers.pypi import is_satisfiable
-from aea.helpers.pypi import merge_dependencies
from aea.identity.base import Identity
from aea.registries.resources import Resources
@@ -282,7 +280,7 @@ class AEABuilder:
"""
- DEFAULT_AGENT_LOOP_TIMEOUT = 0.05
+ DEFAULT_AGENT_ACT_PERIOD = 0.05 # seconds
DEFAULT_EXECUTION_TIMEOUT = 0
DEFAULT_MAX_REACTIONS = 20
DEFAULT_DECISION_MAKER_HANDLER_CLASS: Type[
@@ -326,25 +324,28 @@ def _reset(self, is_full_reset: bool = False) -> None:
:param is_full_reset: whether it is a full reset or not.
:return: None.
"""
- self._name = None # type: Optional[str]
- self._private_key_paths = {} # type: Dict[str, Optional[str]]
- self._connection_private_key_paths = {} # type: Dict[str, Optional[str]]
+ self._name: Optional[str] = None
+ self._private_key_paths: Dict[str, Optional[str]] = {}
+ self._connection_private_key_paths: Dict[str, Optional[str]] = {}
if not is_full_reset:
self._remove_components_from_dependency_manager()
- self._component_instances = {
+ self._component_instances: Dict[
+ ComponentType, Dict[ComponentConfiguration, Component]
+ ] = {
ComponentType.CONNECTION: {},
ComponentType.CONTRACT: {},
ComponentType.PROTOCOL: {},
ComponentType.SKILL: {},
- } # type: Dict[ComponentType, Dict[ComponentConfiguration, Component]]
+ }
+ self._custom_component_configurations: Dict[ComponentId, Dict] = {}
self._to_reset: bool = False
self._build_called: bool = False
if not is_full_reset:
return
self._default_ledger = DEFAULT_LEDGER
self._default_connection: PublicId = DEFAULT_CONNECTION
- self._context_namespace = {} # type: Dict[str, Any]
- self._timeout: Optional[float] = None
+ self._context_namespace: Dict[str, Any] = {}
+ self._period: Optional[float] = None
self._execution_timeout: Optional[float] = None
self._max_reactions: Optional[int] = None
self._decision_maker_handler_class: Optional[Type[DecisionMakerHandler]] = None
@@ -366,15 +367,15 @@ def _remove_components_from_dependency_manager(self) -> None:
component_config.component_id
)
- def set_timeout(self, timeout: Optional[float]) -> "AEABuilder":
+ def set_period(self, period: Optional[float]) -> "AEABuilder":
"""
- Set agent loop idle timeout in seconds.
+ Set agent act period.
- :param timeout: timeout in seconds
+ :param period: period in seconds
:return: self
"""
- self._timeout = timeout
+ self._period = period
return self
def set_execution_timeout(self, execution_timeout: Optional[float]) -> "AEABuilder":
@@ -627,7 +628,7 @@ def add_component(
:return: the AEABuilder
"""
directory = Path(directory)
- configuration = ComponentConfiguration.load(
+ configuration = load_component_configuration(
component_type, directory, skip_consistency_check
)
self._check_can_add(configuration)
@@ -765,10 +766,11 @@ def _build_identity_from_wallet(self, wallet: Wallet) -> Identity:
:param wallet: the wallet
:return: the identity
"""
- assert self._name is not None, "You must set the name of the agent."
+ if self._name is None: # pragma: nocover
+ raise ValueError("You must set the name of the agent.")
if not wallet.addresses:
- raise ValueError("wallet has no addresses")
+ raise ValueError("Wallet has no addresses.")
if len(wallet.addresses) > 1:
identity = Identity(
@@ -869,7 +871,7 @@ def build(self, connection_ids: Optional[Collection[PublicId]] = None,) -> AEA:
wallet,
resources,
loop=None,
- timeout=self._get_agent_loop_timeout(),
+ period=self._get_agent_act_period(),
execution_timeout=self._get_execution_timeout(),
is_debug=False,
max_reactions=self._get_max_reactions(),
@@ -887,20 +889,15 @@ def build(self, connection_ids: Optional[Collection[PublicId]] = None,) -> AEA:
ComponentType.SKILL, resources, identity.name, agent_context=aea.context
)
self._build_called = True
- self._populate_contract_registry()
return aea
- def _get_agent_loop_timeout(self) -> float:
+ def _get_agent_act_period(self) -> float:
"""
- Return agent loop idle timeout.
+ Return agent act period.
- :return: timeout in seconds if set else default value.
+ :return: period in seconds if set else default value.
"""
- return (
- self._timeout
- if self._timeout is not None
- else self.DEFAULT_AGENT_LOOP_TIMEOUT
- )
+ return self._period or self.DEFAULT_AGENT_ACT_PERIOD
def _get_execution_timeout(self) -> float:
"""
@@ -1131,7 +1128,7 @@ def set_from_configuration(
self.set_default_connection(
PublicId.from_str(agent_configuration.default_connection)
)
- self.set_timeout(agent_configuration.timeout)
+ self.set_period(agent_configuration.period)
self.set_execution_timeout(agent_configuration.execution_timeout)
self.set_max_reactions(agent_configuration.max_reactions)
if agent_configuration.decision_maker_handler != {}:
@@ -1216,6 +1213,9 @@ def set_from_configuration(
component_path,
skip_consistency_check=skip_consistency_check,
)
+ self._custom_component_configurations = (
+ agent_configuration.component_configurations
+ )
def _find_import_order(
self,
@@ -1244,7 +1244,7 @@ def _find_import_order(
)
configuration = cast(
SkillConfig,
- ComponentConfiguration.load(
+ load_component_configuration(
skill_id.component_type, component_path, skip_consistency_check
),
)
@@ -1268,9 +1268,7 @@ def _find_import_order(
while len(queue) > 0:
current = queue.pop()
order.append(current)
- for node in supports[
- current
- ]: # pragma: nocover # TODO: extract method and test properly
+ for node in supports[current]: # pragma: nocover
depends_on[node].discard(current)
if len(depends_on[node]) == 0:
queue.append(node)
@@ -1305,9 +1303,6 @@ def from_aea_project(
)
builder = AEABuilder(with_default_packages=False)
- # TODO isolate environment
- # load_env_file(str(aea_config_path / ".env"))
-
# load agent configuration file
configuration_file = aea_project_path / DEFAULT_AEA_CONFIG_FILE
@@ -1350,6 +1345,11 @@ def _load_and_add_components(
)
else:
configuration = deepcopy(configuration)
+ configuration.update(
+ self._custom_component_configurations.get(
+ configuration.component_id, {}
+ )
+ )
_logger = make_logger(configuration, agent_name)
component = load_component_from_config(
configuration, logger=_logger, **kwargs
@@ -1357,37 +1357,6 @@ def _load_and_add_components(
resources.add_component(component)
- def _populate_contract_registry(self):
- """Populate contract registry."""
- for configuration in self._package_dependency_manager.get_components_by_type(
- ComponentType.CONTRACT
- ).values():
- configuration = cast(ContractConfig, configuration)
- if str(configuration.public_id) in contract_registry.specs:
- logger.warning(
- f"Skipping registration of contract {configuration.public_id} since already registered."
- )
- continue
- logger.debug( # pragma: nocover
- f"Registering contract {configuration.public_id}"
- )
- try: # pragma: nocover
- contract_registry.register(
- id_=str(configuration.public_id),
- entry_point=f"{configuration.prefix_import_path}.contract:{configuration.class_name}",
- class_kwargs={
- "contract_interface": configuration.contract_interfaces
- },
- contract_config=configuration, # TODO: resolve configuration being applied globally
- )
- except AEAException as e: # pragma: nocover
- if "Cannot re-register id:" in str(e):
- logger.warning(
- "Already registered: {}".format(configuration.class_name)
- )
- else:
- raise e
-
def _check_we_can_build(self):
if self._build_called and self._to_reset:
raise ValueError(
diff --git a/aea/agent.py b/aea/agent.py
index 31e1270a4b..b62acaadf6 100644
--- a/aea/agent.py
+++ b/aea/agent.py
@@ -17,30 +17,25 @@
#
# ------------------------------------------------------------------------------
"""This module contains the implementation of a generic agent."""
-
+import datetime
import logging
-from abc import ABC, abstractmethod
from asyncio import AbstractEventLoop
-from typing import Dict, List, Optional, Type
+from typing import Any, Callable, Dict, List, Optional, Tuple, Type
-from aea.agent_loop import BaseAgentLoop, SyncAgentLoop
+from aea.abstract_agent import AbstractAgent
from aea.connections.base import Connection
from aea.identity.base import Identity
-from aea.multiplexer import InBox, Multiplexer, OutBox
+from aea.mail.base import Envelope
+from aea.multiplexer import InBox, OutBox
from aea.runtime import AsyncRuntime, BaseRuntime, RuntimeStates, ThreadedRuntime
logger = logging.getLogger(__name__)
-class Agent(ABC):
+class Agent(AbstractAgent):
"""This class provides an abstract base class for a generic agent."""
- RUN_LOOPS: Dict[str, Type[BaseAgentLoop]] = {
- "sync": SyncAgentLoop,
- }
- DEFAULT_RUN_LOOP: str = "sync"
-
RUNTIMES: Dict[str, Type[BaseRuntime]] = {
"async": AsyncRuntime,
"threaded": ThreadedRuntime,
@@ -52,7 +47,7 @@ def __init__(
identity: Identity,
connections: List[Connection],
loop: Optional[AbstractEventLoop] = None,
- timeout: float = 1.0,
+ period: float = 1.0,
loop_mode: Optional[str] = None,
runtime_mode: Optional[str] = None,
) -> None:
@@ -62,48 +57,45 @@ def __init__(
:param identity: the identity of the agent.
:param connections: the list of connections of the agent.
:param loop: the event loop to run the connections.
- :param timeout: the time in (fractions of) seconds to time out an agent between act and react
+ :param period: period to call agent's act
:param loop_mode: loop_mode to choose agent run loop.
:param runtime_mode: runtime mode to up agent.
:return: None
"""
- self._identity = identity
self._connections = connections
-
- self._multiplexer = Multiplexer(self._connections, loop=loop)
- self._inbox = InBox(self._multiplexer)
- self._outbox = OutBox(self._multiplexer, identity.address)
- self._timeout = timeout
-
+ self._identity = identity
+ self._period = period
self._tick = 0
-
- self._loop_mode = loop_mode or self.DEFAULT_RUN_LOOP
- loop_cls = self._get_main_loop_class()
- self._main_loop: BaseAgentLoop = loop_cls(self)
-
self._runtime_mode = runtime_mode or self.DEFAULT_RUNTIME
runtime_cls = self._get_runtime_class()
- self._runtime: BaseRuntime = runtime_cls(agent=self, loop=loop)
+ self._runtime: BaseRuntime = runtime_cls(
+ agent=self, loop_mode=loop_mode, loop=loop
+ )
+
+ self._inbox = InBox(self.runtime.multiplexer)
+ self._outbox = OutBox(self.runtime.multiplexer)
+
+ @property
+ def connections(self) -> List[Connection]:
+ """Return list of connections."""
+ return self._connections
@property
- def is_running(self):
+ def active_connections(self) -> List[Connection]:
+ """Return list of active connections."""
+ return self._connections
+
+ @property
+ def is_running(self) -> bool:
"""Get running state of the runtime and agent."""
return self.runtime.is_running
@property
- def is_stopped(self):
+ def is_stopped(self) -> bool:
"""Get running state of the runtime and agent."""
return self.runtime.is_stopped
- def _get_main_loop_class(self) -> Type[BaseAgentLoop]:
- """Get main loop class based on loop mode."""
- if self._loop_mode not in self.RUN_LOOPS:
- raise ValueError(
- f"Loop `{self._loop_mode} is not supported. valid are: `{list(self.RUN_LOOPS.keys())}`"
- )
- return self.RUN_LOOPS[self._loop_mode]
-
def _get_runtime_class(self) -> Type[BaseRuntime]:
"""Get runtime class based on runtime mode."""
if self._runtime_mode not in self.RUNTIMES:
@@ -112,16 +104,19 @@ def _get_runtime_class(self) -> Type[BaseRuntime]:
)
return self.RUNTIMES[self._runtime_mode]
+ def get_multiplexer_setup_options(self) -> Optional[Dict]:
+ """
+ Get options to pass to Multiplexer.setup.
+
+ :return: dict of kwargs
+ """
+ return {"connections": self.active_connections}
+
@property
def identity(self) -> Identity:
"""Get the identity."""
return self._identity
- @property
- def multiplexer(self) -> Multiplexer:
- """Get the multiplexer."""
- return self._multiplexer
-
@property
def inbox(self) -> InBox: # pragma: nocover
"""
@@ -156,30 +151,25 @@ def tick(self) -> int: # pragma: nocover
"""
return self._tick
- @property
- def timeout(self) -> float:
- """Get the time in (fractions of) seconds to time out an agent between act and react."""
- return self._timeout
+ def handle_envelope(self, envelope: Envelope) -> None: # pragma: nocover
+ """
+ Handle an envelope.
- @property
- def loop_mode(self) -> str:
- """Get the agent loop mode."""
- return self._loop_mode
+ :param envelope: the envelope to handle.
+ :return: None
+ """
+ raise NotImplementedError
@property
- def main_loop(self) -> BaseAgentLoop:
- """Get the main agent loop."""
- return self._main_loop
+ def period(self) -> float:
+ """Get a period to call act."""
+ return self._period
@property
def runtime(self) -> BaseRuntime:
"""Get the runtime."""
return self._runtime
- def setup_multiplexer(self) -> None:
- """Set up the multiplexer."""
- pass
-
def start(self) -> None:
"""
Start the agent.
@@ -203,19 +193,6 @@ def start(self) -> None:
"""
self.runtime.start()
- def start_setup(self) -> None:
- """
- Set up Agent on start.
-
- - connect Multiplexer
- - call agent.setup
- - set liveness to started
-
- :return: None
- """
- logger.debug("[{}]: Calling setup method...".format(self.name))
- self.setup()
-
def stop(self) -> None:
"""
Stop the agent.
@@ -230,51 +207,45 @@ def stop(self) -> None:
"""
self.runtime.stop()
- @abstractmethod
- def setup(self) -> None:
- """
- Set up the agent.
-
- :return: None
- """
-
- @abstractmethod
- def act(self) -> None:
- """
- Perform actions.
-
- :return: None
+ @property
+ def state(self) -> RuntimeStates:
"""
+ Get state of the agent's runtime.
- @abstractmethod
- def react(self) -> None:
+ :return: RuntimeStates
"""
- React to events.
+ return self._runtime.state
- :return: None
+ def get_periodic_tasks(
+ self,
+ ) -> Dict[Callable, Tuple[float, Optional[datetime.datetime]]]:
"""
+ Get all periodic tasks for agent.
- @abstractmethod
- def update(self) -> None:
+ :return: dict of callable with period specified
"""
- Update the internals of the agent which are not exposed to the skills.
+ return {self.act: (self.period, None)}
- :return None
+ def get_message_handlers(self) -> List[Tuple[Callable[[Any], None], Callable]]:
"""
+ Get handlers with message getters.
- @abstractmethod
- def teardown(self) -> None:
+ :return: List of tuples of callables: handler and coroutine to get a message
"""
- Tear down the agent.
+ return [(self.handle_envelope, self.inbox.async_get)]
- :return: None
+ def exception_handler(
+ self, exception: Exception, function: Callable
+ ) -> bool: # pragma: nocover
"""
+ Handle exception raised during agent main loop execution.
- @property
- def state(self) -> RuntimeStates:
- """
- Get state of the agent's runtime.
+ :param exception: exception raised
+ :param function: a callable exception raised in.
- :return: RuntimeStates
+ :return: bool, propagate exception if True otherwise skip it.
"""
- return self._runtime.state
+ logger.exception(
+ f"Exception {repr(exception)} raised during {repr(function)} call."
+ )
+ return True
diff --git a/aea/agent_loop.py b/aea/agent_loop.py
index e60a612360..59fff7c62d 100644
--- a/aea/agent_loop.py
+++ b/aea/agent_loop.py
@@ -18,6 +18,7 @@
# ------------------------------------------------------------------------------
"""This module contains the implementation of an agent loop using asyncio."""
import asyncio
+import datetime
import logging
from abc import ABC, abstractmethod
from asyncio import CancelledError
@@ -25,37 +26,28 @@
from asyncio.tasks import Task
from enum import Enum
from functools import partial
-from typing import (
- Callable,
- Dict,
- List,
- Optional,
- TYPE_CHECKING,
-)
+from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple
+from aea.abstract_agent import AbstractAgent
from aea.exceptions import AEAException
from aea.helpers.async_utils import (
AsyncState,
+ HandlerItemGetter,
PeriodicCaller,
ensure_loop,
)
+from aea.helpers.exec_timeout import ExecTimeoutThreadGuard, TimeoutException
from aea.helpers.logging import WithLogger
-from aea.multiplexer import InBox
-from aea.skills.base import Behaviour
logger = logging.getLogger(__name__)
-if TYPE_CHECKING:
- from aea.aea import AEA # pragma: no cover
- from aea.agent import Agent # pragma: no cover
-
class BaseAgentLoop(WithLogger, ABC):
"""Base abstract agent loop class."""
def __init__(
- self, agent: "Agent", loop: Optional[AbstractEventLoop] = None
+ self, agent: AbstractAgent, loop: Optional[AbstractEventLoop] = None
) -> None:
"""Init loop.
@@ -63,24 +55,39 @@ def __init__(
:params loop: optional asyncio event loop. if not specified a new loop will be created.
"""
WithLogger.__init__(self, logger)
- self._agent: "Agent" = agent
+ self._agent: AbstractAgent = agent
self.set_loop(ensure_loop(loop))
self._tasks: List[asyncio.Task] = []
self._state: AsyncState = AsyncState()
self._exceptions: List[Exception] = []
+ @property
+ def agent(self) -> AbstractAgent: # pragma: nocover
+ """Get agent."""
+ return self._agent
+
def set_loop(self, loop: AbstractEventLoop) -> None:
"""Set event loop and all event loopp related objects."""
self._loop: AbstractEventLoop = loop
def start(self) -> None:
"""Start agent loop synchronously in own asyncio loop."""
+ self.setup()
self._loop.run_until_complete(self.run_loop())
+ def setup(self) -> None: # pylint: disable=no-self-use
+ """Set up loop before started."""
+ # start and stop methods are classmethods cause one instance shared across muiltiple threads
+ ExecTimeoutThreadGuard.start()
+
+ def teardown(self): # pylint: disable=no-self-use
+ """Tear down loop on stop."""
+ # start and stop methods are classmethods cause one instance shared across muiltiple threads
+ ExecTimeoutThreadGuard.stop()
+
async def run_loop(self) -> None:
"""Run agent loop."""
self.logger.debug("agent loop started")
- self._state.set(AgentLoopStates.started)
self._set_tasks()
try:
await self._gather_tasks()
@@ -106,6 +113,7 @@ async def wait_run_loop_stopped(self) -> None:
def stop(self) -> None:
"""Stop agent loop."""
+ self.teardown()
self._state.set(AgentLoopStates.stopping)
logger.debug("agent loop stopping!")
if self._loop.is_running():
@@ -122,7 +130,7 @@ def _stop_tasks(self) -> None:
"""Cancel all tasks."""
for task in self._tasks:
if task.done():
- continue
+ continue # pragma: nocover
task.cancel()
@property
@@ -150,7 +158,7 @@ class AsyncAgentLoop(BaseAgentLoop):
NEW_BEHAVIOURS_PROCESS_SLEEP = 1 # check new behaviours registered every second.
- def __init__(self, agent: "AEA", loop: AbstractEventLoop = None):
+ def __init__(self, agent: AbstractAgent, loop: AbstractEventLoop = None):
"""
Init agent loop.
@@ -158,85 +166,130 @@ def __init__(self, agent: "AEA", loop: AbstractEventLoop = None):
:param loop: asyncio loop to use. optional
"""
super().__init__(agent=agent, loop=loop)
- self._agent: "AEA" = self._agent
+ self._agent: AbstractAgent = self._agent
- self._behaviours_registry: Dict[Behaviour, PeriodicCaller] = {}
+ self._periodic_tasks: Dict[Callable, PeriodicCaller] = {}
- def _behaviour_exception_callback(self, fn: Callable, exc: Exception) -> None:
+ def _periodic_task_exception_callback(
+ self, task_callable: Callable, exc: Exception
+ ) -> None:
"""
- Call on behaviour's act exception.
+ Call on periodic task exception.
- :param fn: behaviour's act
+ :param task_callable: function to be called
:param: exc: Exception raised
:return: None
"""
self.logger.exception(
- f"Loop: Exception: `{exc}` occured during `{fn}` processing"
+ f"Loop: Exception: `{exc}` occured during `{task_callable}` processing"
)
self._exceptions.append(exc)
- self._state.set(AgentLoopStates.error)
- def _register_behaviour(self, behaviour: Behaviour) -> None:
+ def _execution_control(
+ self,
+ fn: Callable,
+ args: Optional[Sequence] = None,
+ kwargs: Optional[Dict] = None,
+ ) -> Any:
"""
- Register behaviour to run periodically.
+ Execute skill function in exception handling environment.
- :param behaviour: Behaviour object
+ Logs error, stop agent or propagate exception depends on policy defined.
+
+ :param fn: function to call
+ :param args: optional sequence of arguments to pass to function on call
+ :param kwargs: optional dict of keyword arguments to pass to function on call
+
+ :return: same as function
+ """
+ execution_timeout = getattr(self.agent, "_execution_timeout", 0)
+
+ try:
+ with ExecTimeoutThreadGuard(execution_timeout):
+ return fn(*(args or []), **(kwargs or {}))
+ except TimeoutException: # pragma: nocover
+ self.logger.warning(
+ "`{}` was terminated as its execution exceeded the timeout of {} seconds. Please refactor your code!".format(
+ fn, execution_timeout
+ )
+ )
+ except Exception as e: # pylint: disable=broad-except
+ try:
+ if self.agent.exception_handler(e, fn) is True:
+ self._state.set(AgentLoopStates.error)
+ raise
+ except Exception as e:
+ self._state.set(AgentLoopStates.error)
+ self._exceptions.append(e)
+ raise
+
+ def _register_periodic_task(
+ self,
+ task_callable: Callable,
+ period: float,
+ start_at: Optional[datetime.datetime],
+ ) -> None:
+ """
+ Register function to run periodically.
+
+ :param task_callable: function to be called
+ :param pediod: float: in seconds
+ :param start_at: optional datetime, when to run task for the first time, otherwise call it right now
:return: None
"""
- if behaviour in self._behaviours_registry: # pragma: nocover
+ if task_callable in self._periodic_tasks: # pragma: nocover
# already registered
return
periodic_caller = PeriodicCaller(
- partial(
- self._agent._execution_control, # pylint: disable=protected-access # TODO: refactoring!
- behaviour.act_wrapper,
- behaviour,
- ),
- behaviour.tick_interval,
- behaviour.start_at,
- self._behaviour_exception_callback,
- self._loop,
+ partial(self._execution_control, task_callable),
+ period=period,
+ start_at=start_at,
+ exception_callback=self._periodic_task_exception_callback,
+ loop=self._loop,
)
- self._behaviours_registry[behaviour] = periodic_caller
+ self._periodic_tasks[task_callable] = periodic_caller
periodic_caller.start()
- self.logger.debug(f"Behaviour {behaviour} registered.")
+ self.logger.debug(f"Periodic task {task_callable} registered.")
- def _register_all_behaviours(self) -> None:
- """Register all AEA behaviours to run periodically."""
- for behaviour in self._agent.active_behaviours:
- self._register_behaviour(behaviour)
+ def _register_periodic_tasks(self) -> None:
+ """Register all AEA related periodic tasks."""
+ for (
+ task_callable,
+ (period, start_at),
+ ) in self._agent.get_periodic_tasks().items():
+ self._register_periodic_task(task_callable, period, start_at)
- def _unregister_behaviour(self, behaviour: Behaviour) -> None:
+ def _unregister_periodic_task(self, task_callable: Callable) -> None:
"""
- Unregister periodic execution of the behaviour.
+ Unregister periodic execution of the task.
- :param behaviour: Behaviour to schedule periodic execution.
+ :param task_callable: function to be called periodically.
:return: None
"""
- periodic_caller = self._behaviours_registry.pop(behaviour, None)
+ periodic_caller = self._periodic_tasks.pop(task_callable, None)
if periodic_caller is None: # pragma: nocover
return
periodic_caller.stop()
def _stop_all_behaviours(self) -> None:
"""Unregister periodic execution of all registered behaviours."""
- for behaviour in list(self._behaviours_registry.keys()):
- self._unregister_behaviour(behaviour)
+ for task_callable in list(self._periodic_tasks.keys()):
+ self._unregister_periodic_task(task_callable)
async def _task_wait_for_error(self) -> None:
"""Wait for error and raise first."""
await self._state.wait(AgentLoopStates.error)
raise self._exceptions[0]
- def _stop_tasks(self):
+ def _stop_tasks(self) -> None:
"""Cancel all tasks and stop behaviours registered."""
BaseAgentLoop._stop_tasks(self)
self._stop_all_behaviours()
- def _set_tasks(self):
+ def _set_tasks(self) -> None:
"""Set run loop tasks."""
self._tasks = self._create_tasks()
self.logger.debug("tasks created!")
@@ -248,67 +301,29 @@ def _create_tasks(self) -> List[Task]:
:return: list of asyncio Tasks
"""
tasks = [
- self._task_process_inbox(),
- self._task_process_internal_messages(),
- self._task_process_new_skill_components(),
+ self._process_messages(HandlerItemGetter(self._message_handlers())),
+ self._task_register_periodic_tasks(),
self._task_wait_for_error(),
]
return list(map(self._loop.create_task, tasks)) # type: ignore # some issue with map and create_task
- async def _task_process_inbox(self) -> None:
- """Process incoming messages."""
- inbox: InBox = self._agent.inbox
- self.logger.info("Start processing messages...")
- while self.is_running:
- await inbox.async_wait()
- self._agent.react()
+ def _message_handlers(self) -> List[Tuple[Callable[[Any], None], Callable]]:
+ """Get all agent's message handlers."""
+ return self._agent.get_message_handlers()
- async def _task_process_internal_messages(self) -> None:
- """Process decision maker's internal messages."""
- queue = self._agent.decision_maker.message_out_queue
+ async def _process_messages(self, getter: HandlerItemGetter) -> None:
+ """Process message from ItemGetter."""
+ self.logger.info("Start processing messages...")
+ self._state.set(AgentLoopStates.started)
while self.is_running:
- msg = await queue.async_get()
- # TODO: better interaction with agent's internal messages
- self._agent.filter._process_internal_message( # pylint: disable=protected-access # TODO: refactoring!
- msg
- )
+ handler, item = await getter.get()
+ self._execution_control(handler, [item])
- async def _task_process_new_skill_components(self) -> None:
+ async def _task_register_periodic_tasks(self) -> None:
"""Process new behaviours added to skills in runtime."""
while self.is_running:
- # TODO: better handling internal messages for skills internal updates
- self._agent.filter._handle_new_behaviours() # pylint: disable=protected-access # TODO: refactoring!
- self._agent.filter._handle_new_handlers() # pylint: disable=protected-access # TODO: refactoring!
- self._register_all_behaviours() # re register, cause new may appear
+ self._register_periodic_tasks() # re register, cause new may appear
await asyncio.sleep(self.NEW_BEHAVIOURS_PROCESS_SLEEP)
-class SyncAgentLoop(BaseAgentLoop):
- """Synchronous agent loop."""
-
- def __init__(self, agent: "Agent", loop: AbstractEventLoop = None):
- """
- Init agent loop.
-
- :param agent: AEA instance
- :param loop: asyncio loop to use. optional
- """
- super().__init__(agent=agent, loop=loop)
- self._agent: "AEA" = self._agent
- asyncio.set_event_loop(self._loop)
-
- async def _agent_loop(self) -> None:
- """Run loop inside coroutine but call synchronous callbacks from agent."""
- while self.is_running:
- self._spin_main_loop()
- await asyncio.sleep(self._agent.timeout)
-
- def _spin_main_loop(self) -> None:
- """Run one spin of agent loop: act, react, update."""
- self._agent.act()
- self._agent.react()
- self._agent.update()
-
- def _set_tasks(self) -> None:
- """Set run loop tasks."""
- self._tasks = [self._loop.create_task(self._agent_loop())]
+SyncAgentLoop = AsyncAgentLoop # temporary solution!
diff --git a/aea/cli/config.py b/aea/cli/config.py
index b4ba80a08b..06c18ae08b 100644
--- a/aea/cli/config.py
+++ b/aea/cli/config.py
@@ -37,7 +37,7 @@
@click.group()
@click.pass_context
@check_aea_project
-def config(click_context):
+def config(click_context): # pylint: disable=unused-argument
"""Read or modify a configuration."""
@@ -112,7 +112,7 @@ def _set_config(ctx: Context, json_path: List[str], value: str, type_str: str) -
configuration_obj = config_loader.configuration_class.from_json(
configuration_object
)
- config_loader.validator.validate(instance=configuration_obj.json)
+ config_loader.validate(configuration_obj.json)
config_loader.dump(configuration_obj, open(configuration_file_path, "w"))
except Exception:
raise click.ClickException("Attribute or value not valid.")
diff --git a/aea/cli/core.py b/aea/cli/core.py
index 9113fc9916..9bdb9f40db 100644
--- a/aea/cli/core.py
+++ b/aea/cli/core.py
@@ -37,6 +37,7 @@
from aea.cli.generate_key import generate_key
from aea.cli.generate_wealth import generate_wealth
from aea.cli.get_address import get_address
+from aea.cli.get_multiaddress import get_multiaddress
from aea.cli.get_wealth import get_wealth
from aea.cli.init import init
from aea.cli.install import install
@@ -85,7 +86,9 @@ def cli(click_context, skip_consistency_check: bool) -> None:
@click.option("-p", "--port", default=8080)
@click.option("--local", is_flag=True, help="For using local folder.")
@click.pass_context
-def gui(click_context, port, local): # pragma: no cover
+def gui( # pylint: disable=unused-argument
+ click_context, port, local
+): # pragma: no cover
"""Run the CLI GUI."""
_init_gui()
import aea.cli_gui # pylint: disable=import-outside-toplevel,redefined-outer-name
@@ -101,8 +104,8 @@ def _init_gui() -> None:
:return: None
:raisees: ClickException if author is not set up.
"""
- config = get_or_create_cli_config()
- author = config.get(AUTHOR_KEY, None)
+ config_ = get_or_create_cli_config()
+ author = config_.get(AUTHOR_KEY, None)
if author is None:
raise click.ClickException(
"Author is not set up. Please run 'aea init' and then restart."
@@ -123,6 +126,7 @@ def _init_gui() -> None:
cli.add_command(generate_wealth)
cli.add_command(generate)
cli.add_command(get_address)
+cli.add_command(get_multiaddress)
cli.add_command(get_wealth)
cli.add_command(init)
cli.add_command(install)
diff --git a/aea/cli/eject.py b/aea/cli/eject.py
index f031b632ae..0cb1ca2d55 100644
--- a/aea/cli/eject.py
+++ b/aea/cli/eject.py
@@ -39,7 +39,7 @@
@click.group()
@click.pass_context
@check_aea_project
-def eject(click_context: click.core.Context):
+def eject(click_context: click.core.Context): # pylint: disable=unused-argument
"""Eject an installed item."""
diff --git a/aea/cli/fingerprint.py b/aea/cli/fingerprint.py
index 3e9507bb92..c52bebbafe 100644
--- a/aea/cli/fingerprint.py
+++ b/aea/cli/fingerprint.py
@@ -40,7 +40,7 @@
@click.group()
@click.pass_context
-def fingerprint(click_context):
+def fingerprint(click_context): # pylint: disable=unused-argument
"""Fingerprint a resource."""
diff --git a/aea/cli/generate.py b/aea/cli/generate.py
index 2cfeeaeb35..2eef2171b5 100644
--- a/aea/cli/generate.py
+++ b/aea/cli/generate.py
@@ -40,7 +40,7 @@
@click.group()
@click.pass_context
@check_aea_project
-def generate(click_context):
+def generate(click_context): # pylint: disable=unused-argument
"""Generate a resource for the agent."""
diff --git a/aea/cli/generate_key.py b/aea/cli/generate_key.py
index 1781c0fd5b..8173a276ec 100644
--- a/aea/cli/generate_key.py
+++ b/aea/cli/generate_key.py
@@ -24,7 +24,7 @@
import click
-from aea.crypto.helpers import IDENTIFIER_TO_KEY_FILES, create_private_key
+from aea.crypto.helpers import PRIVATE_KEY_PATH_SCHEMA, create_private_key
from aea.crypto.registries import crypto_registry
@@ -57,14 +57,13 @@ def _generate_private_key(type_: str, file: Optional[str] = None) -> None:
"""
if type_ == "all" and file is not None:
raise click.ClickException("Type all cannot be used in combination with file.")
- elif type_ == "all":
- types = list(IDENTIFIER_TO_KEY_FILES.keys())
- else:
- types = [type_]
- for type_ in types:
- private_key_file = IDENTIFIER_TO_KEY_FILES[type_] if file is None else file
+ types = list(crypto_registry.supported_ids) if type_ == "all" else [type_]
+ for type__ in types:
+ private_key_file = (
+ PRIVATE_KEY_PATH_SCHEMA.format(type__) if file is None else file
+ )
if _can_write(private_key_file):
- create_private_key(type_, private_key_file)
+ create_private_key(type__, private_key_file)
def _can_write(path) -> bool:
@@ -74,5 +73,4 @@ def _can_write(path) -> bool:
default=False,
)
return value
- else:
- return True
+ return True
diff --git a/aea/cli/generate_wealth.py b/aea/cli/generate_wealth.py
index fc96215077..c8d346a4a0 100644
--- a/aea/cli/generate_wealth.py
+++ b/aea/cli/generate_wealth.py
@@ -19,26 +19,18 @@
"""Implementation of the 'aea generate_wealth' subcommand."""
-import time
from typing import Dict, Optional, cast
import click
from aea.cli.utils.context import Context
from aea.cli.utils.decorators import check_aea_project
-from aea.cli.utils.package_utils import (
- try_get_balance,
- verify_or_create_private_keys_ctx,
-)
-from aea.configurations.base import AgentConfig
+from aea.cli.utils.package_utils import verify_or_create_private_keys_ctx
from aea.crypto.helpers import try_generate_testnet_wealth
from aea.crypto.registries import faucet_apis_registry, make_faucet_api_cls
from aea.crypto.wallet import Wallet
-FUNDS_RELEASE_TIMEOUT = 30
-
-
@click.command()
@click.argument(
"type_",
@@ -84,30 +76,7 @@ def _try_generate_wealth(
address, testnet
)
)
- try_generate_testnet_wealth(type_, address)
- if sync:
- _wait_funds_release(ctx.agent_config, wallet, type_)
+ try_generate_testnet_wealth(type_, address, sync)
- except (AssertionError, ValueError) as e: # pragma: no cover
+ except ValueError as e: # pragma: no cover
raise click.ClickException(str(e))
-
-
-def _wait_funds_release(agent_config: AgentConfig, wallet: Wallet, type_: str) -> None:
- """
- Wait for the funds to be released.
-
- :param agent_config: the agent config
- :param wallet: the wallet
- :param type_: the network type
- """
- start_balance = try_get_balance(agent_config, wallet, type_)
- end_time = time.time() + FUNDS_RELEASE_TIMEOUT
- has_hit_timeout = True
- while time.time() < end_time:
- current_balance = try_get_balance(agent_config, wallet, type_)
- if start_balance != current_balance:
- has_hit_timeout = False
- break # pragma: no cover
- time.sleep(1)
- if has_hit_timeout:
- raise ValueError("Timeout hit. Syncing did not finish.")
diff --git a/aea/cli/get_multiaddress.py b/aea/cli/get_multiaddress.py
new file mode 100644
index 0000000000..81d63ad2f3
--- /dev/null
+++ b/aea/cli/get_multiaddress.py
@@ -0,0 +1,243 @@
+# -*- coding: utf-8 -*-
+# ------------------------------------------------------------------------------
+#
+# Copyright 2018-2020 Fetch.AI Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ------------------------------------------------------------------------------
+
+"""Implementation of the 'aea get_multiaddress' subcommand."""
+import re
+import typing
+from pathlib import Path
+from typing import Optional, Tuple, cast
+
+import click
+from click import ClickException
+
+from aea.cli.utils.click_utils import PublicIdParameter
+from aea.cli.utils.config import load_item_config
+from aea.cli.utils.context import Context
+from aea.cli.utils.decorators import check_aea_project
+from aea.cli.utils.package_utils import get_package_path_unified
+from aea.configurations.base import (
+ ConnectionConfig,
+ PublicId,
+)
+from aea.crypto.base import Crypto
+from aea.crypto.registries import crypto_registry
+from aea.exceptions import enforce
+from aea.helpers.multiaddr.base import MultiAddr
+
+
+URI_REGEX = re.compile(r"(?:https?://)?(?P
diff --git a/docs/aries-cloud-agent-demo.md b/docs/aries-cloud-agent-demo.md
index 86375dfa4b..3a24b6ff48 100644
--- a/docs/aries-cloud-agent-demo.md
+++ b/docs/aries-cloud-agent-demo.md
@@ -8,7 +8,7 @@ Demonstrating an entire decentralised identity scenario involving AEAs and insta
## Discussion
-This demo corresponds with the one here from aries cloud agent repository .
+This demo corresponds with the one here from aries cloud agent repository .
The aim of this demo is to illustrate how AEAs can connect to ACAs, thus gaining all of their capabilities, such as issuing and requesting verifiable credentials, selective disclosure and zero knowledge proofs.
@@ -81,7 +81,7 @@ All messages from an AEA to another AEA utilise the p2p communication network ac
All messages initiated from an ACA to an AEA are webhooks (using `webhook` connection).
-This is the extent of the demo at this point. The rest of the interactions require an instance of the Indy ledger to run. This is what will be implemented next.
+This is the extent of the demo at this point. The rest of the interactions require an instance of the Indy ledger to run. This is what will be implemented next.
The rest of the interactions are broadly as follows:
@@ -101,7 +101,7 @@ At this point, the two ACAs are connected to each other.
Follow the Preliminaries and Installation sections from the AEA quick start.
-Install Aries cloud-agents (for more info see here) if you do not have it on your machine:
+Install Aries cloud-agents (for more info see here) if you do not have it on your machine:
``` bash
pip install aries-cloudagent
@@ -109,7 +109,7 @@ pip install aries-cloudagent
This demo has been successfully tested with aca-py version 0.4.5.
-This demo requires an instance of von network running in docker locally (for more info see here)
+This demo requires an instance of von network running in docker locally (for more info see here)
This demo has been successfully tested with the von-network git repository pulled on 07 Aug 2020 (commit number `ad1f84f64d4f4c106a81462f5fbff496c5fbf10e`).
@@ -121,13 +121,13 @@ Open five terminals. The first terminal is used to run an instance of von-networ
In the first terminal move to the `von-network` directory and run an instance of `von-network` locally in docker.
-This tutorial has information on starting (and stopping) the network locally.
+This tutorial has information on starting (and stopping) the network locally.
``` bash
./manage build
./manage start --logs
```
-Once the ledger is running, you can see the ledger by going to the web server running on port 9000. On localhost, that means going to http://localhost:9000.
+Once the ledger is running, you can see the ledger by going to the web server running on port 9000. On localhost, that means going to http://localhost:9000.
## Alice and Faber ACAs
@@ -180,7 +180,7 @@ Now you can create **Alice_AEA** and **Faber_AEA** in terminals 3 and 4 respecti
In the third terminal, fetch **Alice_AEA** and move into its project folder:
``` bash
-aea fetch fetchai/aries_alice:0.8.0
+aea fetch fetchai/aries_alice:0.9.0
cd aries_alice
```
@@ -191,11 +191,11 @@ The following steps create **Alice_AEA** from scratch:
``` bash
aea create aries_alice
cd aries_alice
-aea add connection fetchai/p2p_libp2p:0.7.0
-aea add connection fetchai/soef:0.6.0
-aea add connection fetchai/http_client:0.7.0
-aea add connection fetchai/webhook:0.5.0
-aea add skill fetchai/aries_alice:0.5.0
+aea add connection fetchai/p2p_libp2p:0.8.0
+aea add connection fetchai/soef:0.7.0
+aea add connection fetchai/http_client:0.8.0
+aea add connection fetchai/webhook:0.6.0
+aea add skill fetchai/aries_alice:0.6.0
```
Note
diff --git a/docs/demos.md b/docs/demos.md new file mode 100644 index 0000000000..6d33a2fb6f --- /dev/null +++ b/docs/demos.md @@ -0,0 +1,5 @@ +We provide demo guides for multiple use-cases, each one involving several AEAs interacting in a different scenario. + +These demos serve to highlight the concept of AEAs as well as provide inspiration for developers. + +Demos are alphabetically sorted, we recommend you start with the weather skills demo. \ No newline at end of file diff --git a/docs/deployment.md b/docs/deployment.md index e6d64b2b4d..dc6ab0f443 100644 --- a/docs/deployment.md +++ b/docs/deployment.md @@ -1,9 +1,39 @@ + +The easiest way to run an AEA is using your development environment. + +If you would like to run an AEA from a browser you can use Google Colab. This gist can be opened in Colab and implements the quickstart. + +For deployment, we recommend you use Docker. + +## Building a Docker Image + +First, we fetch a directory containing a Dockerfile and some dependencies: +``` bash +svn export https://github.com/fetchai/agents-aea/branches/master/deploy-image +cd deploy-image +rm -rf scripts +svn export https://github.com/fetchai/docker-images/branches/master/scripts +cd .. +``` + +Next, we build the image: +``` bash +./deploy-image/scripts/docker-build-img.sh -t aea-deploy:latest -- +``` + +## Running a Docker Image + +Finally, we run it: +``` bash +docker run -it aea-deploy:latest +``` + +This will run the `fetchai/my_first_aea:0.10.0` demo project. You can edit `entrypoint.sh` to run whatever project you would like. + +## Deployment +Note
This section is incomplete and will soon be updated.
Note
-This developer documentation is a work in progress. If you spot any errors please open an issue here.
+This developer documentation is a work in progress. If you spot any errors please open an issue on Github or contact us in the developer Slack channel.
This section is incomplete, and will be updated soon!