diff --git a/aea/aea.py b/aea/aea.py index 7027bb452b..2470087ad5 100644 --- a/aea/aea.py +++ b/aea/aea.py @@ -34,6 +34,7 @@ from aea.exceptions import AEAException from aea.helpers.exception_policy import ExceptionPolicyEnum from aea.helpers.exec_timeout import ExecTimeoutThreadGuard, TimeoutException +from aea.helpers.logging import AgentLoggerAdapter from aea.identity.base import Identity from aea.mail.base import Envelope from aea.protocols.base import Message @@ -135,6 +136,8 @@ def __init__( self._skills_exception_policy = skill_exception_policy + self._setup_loggers() + @property def decision_maker(self) -> DecisionMaker: """Get decision maker.""" @@ -383,3 +386,18 @@ def teardown(self) -> None: self.task_manager.stop() self.resources.teardown() ExecTimeoutThreadGuard.stop() + + def _setup_loggers(self): + """Setup logger with agent name. """ + for element in [ + self.main_loop, + self.multiplexer, + self.task_manager, + self.resources.component_registry, + self.resources.behaviour_registry, + self.resources.handler_registry, + self.resources.model_registry, + ]: + element.logger = AgentLoggerAdapter( + element.logger, agent_name=self._identity.name + ) diff --git a/aea/aea_builder.py b/aea/aea_builder.py index 46fbf05383..0b043d98b6 100644 --- a/aea/aea_builder.py +++ b/aea/aea_builder.py @@ -84,6 +84,7 @@ from aea.exceptions import AEAException from aea.helpers.base import load_aea_package, load_module from aea.helpers.exception_policy import ExceptionPolicyEnum +from aea.helpers.logging import AgentLoggerAdapter from aea.helpers.pypi import is_satisfiable from aea.helpers.pypi import merge_dependencies from aea.identity.base import Identity @@ -879,11 +880,12 @@ def build(self, connection_ids: Optional[Collection[PublicId]] = None,) -> AEA: copy(self.private_key_paths), copy(self.connection_private_key_paths) ) identity = self._build_identity_from_wallet(wallet) - self._load_and_add_components(ComponentType.PROTOCOL, resources) - self._load_and_add_components(ComponentType.CONTRACT, resources) + self._load_and_add_components(ComponentType.PROTOCOL, resources, identity.name) + self._load_and_add_components(ComponentType.CONTRACT, resources, identity.name) self._load_and_add_components( ComponentType.CONNECTION, resources, + identity.name, identity=identity, crypto_store=wallet.connection_cryptos, ) @@ -908,7 +910,7 @@ def build(self, connection_ids: Optional[Collection[PublicId]] = None,) -> AEA: **deepcopy(self._context_namespace), ) self._load_and_add_components( - ComponentType.SKILL, resources, agent_context=aea.context + ComponentType.SKILL, resources, identity.name, agent_context=aea.context ) self._build_called = True self._populate_contract_registry() @@ -1346,28 +1348,36 @@ def from_aea_project( return builder def _load_and_add_components( - self, component_type: ComponentType, resources: Resources, **kwargs + self, + component_type: ComponentType, + resources: Resources, + agent_name: str, + **kwargs, ) -> None: """ Load and add components added to the builder to a Resources instance. :param component_type: the component type for which :param resources: the resources object to populate. + :param agent_name: the AEA name for logging purposes. :param kwargs: keyword argument to forward to the component loader. :return: None """ for configuration in self._package_dependency_manager.get_components_by_type( component_type ).values(): + if configuration.is_abstract_component: + load_aea_package(configuration) + continue + if configuration in self._component_instances[component_type].keys(): component = self._component_instances[component_type][configuration] - resources.add_component(component) - elif configuration.is_abstract_component: - load_aea_package(configuration) else: configuration = deepcopy(configuration) component = load_component_from_config(configuration, **kwargs) - resources.add_component(component) + + _set_logger_to_component(component, configuration, agent_name) + resources.add_component(component) def _populate_contract_registry(self): """Populate contract registry.""" @@ -1413,6 +1423,25 @@ def _check_we_can_build(self): ) +def _set_logger_to_component( + component: Component, configuration: ComponentConfiguration, agent_name: str, +) -> None: + """ + Set the logger to the component. + + :param component: the component instance. + :param configuration: the component configuration + :param agent_name: the agent name + :return: None + """ + if configuration.component_type == ComponentType.SKILL: + # skip because skill object already have their own logger from the skill context. + return + logger_name = f"aea.packages.{configuration.author}.{configuration.component_type.to_plural()}.{configuration.name}" + logger = AgentLoggerAdapter(logging.getLogger(logger_name), agent_name) + component.logger = logger + + # TODO this function is repeated in 'aea.cli.utils.package_utils.py' def _verify_or_create_private_keys(aea_project_path: Path) -> None: """Verify or create private keys.""" diff --git a/aea/agent_loop.py b/aea/agent_loop.py index 5f6d176a2d..639fc0fea5 100644 --- a/aea/agent_loop.py +++ b/aea/agent_loop.py @@ -39,6 +39,7 @@ PeriodicCaller, ensure_loop, ) +from aea.helpers.logging import WithLogger from aea.multiplexer import InBox from aea.skills.base import Behaviour @@ -50,7 +51,7 @@ from aea.agent import Agent # pragma: no cover -class BaseAgentLoop(ABC): +class BaseAgentLoop(WithLogger, ABC): """Base abstract agent loop class.""" def __init__( @@ -61,6 +62,7 @@ def __init__( :params agent: Agent or AEA to run. :params loop: optional asyncio event loop. if not specified a new loop will be created. """ + WithLogger.__init__(self, logger) self._agent: "Agent" = agent self.set_loop(ensure_loop(loop)) self._tasks: List[asyncio.Task] = [] @@ -77,7 +79,7 @@ def start(self) -> None: async def run_loop(self) -> None: """Run agent loop.""" - logger.debug("agent loop started") + self.logger.debug("agent loop started") self._state.set(AgentLoopStates.started) self._set_tasks() try: @@ -171,7 +173,9 @@ def _behaviour_exception_callback(self, fn: Callable, exc: Exception) -> None: :return: None """ - logger.exception(f"Loop: Exception: `{exc}` occured during `{fn}` processing") + self.logger.exception( + f"Loop: Exception: `{exc}` occured during `{fn}` processing" + ) self._exceptions.append(exc) self._state.set(AgentLoopStates.error) @@ -200,7 +204,7 @@ def _register_behaviour(self, behaviour: Behaviour) -> None: ) self._behaviours_registry[behaviour] = periodic_caller periodic_caller.start() - logger.debug(f"Behaviour {behaviour} registered.") + self.logger.debug(f"Behaviour {behaviour} registered.") def _register_all_behaviours(self) -> None: """Register all AEA behaviours to run periodically.""" @@ -237,7 +241,7 @@ def _stop_tasks(self): def _set_tasks(self): """Set run loop tasks.""" self._tasks = self._create_tasks() - logger.debug("tasks created!") + self.logger.debug("tasks created!") def _create_tasks(self) -> List[Task]: """ @@ -256,7 +260,7 @@ def _create_tasks(self) -> List[Task]: async def _task_process_inbox(self) -> None: """Process incoming messages.""" inbox: InBox = self._agent.inbox - logger.info("[{}]: Start processing messages...".format(self._agent.name)) + self.logger.info("[{}]: Start processing messages...".format(self._agent.name)) while self.is_running: await inbox.async_wait() diff --git a/aea/components/base.py b/aea/components/base.py index 25a8e2e116..166defa01d 100644 --- a/aea/components/base.py +++ b/aea/components/base.py @@ -30,11 +30,12 @@ ComponentType, PublicId, ) +from aea.helpers.logging import WithLogger logger = logging.getLogger(__name__) -class Component(ABC): +class Component(ABC, WithLogger): """Abstract class for an agent component.""" def __init__( @@ -48,6 +49,7 @@ def __init__( :param configuration: the package configuration. :param is_vendor: whether the package is vendorized. """ + WithLogger.__init__(self) self._configuration = configuration self._directory = None # type: Optional[Path] self._is_vendor = is_vendor diff --git a/aea/helpers/logging.py b/aea/helpers/logging.py new file mode 100644 index 0000000000..6cc46866ba --- /dev/null +++ b/aea/helpers/logging.py @@ -0,0 +1,72 @@ +# -*- coding: utf-8 -*- +# ------------------------------------------------------------------------------ +# +# Copyright 2018-2019 Fetch.AI Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------------ +"""Logging helpers.""" +import logging +from logging import Logger, LoggerAdapter +from typing import Any, MutableMapping, Optional, Tuple, Union + + +class AgentLoggerAdapter(LoggerAdapter): + """This class is a logger adapter that prepends the agent name to log messages.""" + + def __init__(self, logger: Logger, agent_name: str): + """ + Initialize the logger adapter. + + :param agent_name: the agent name. + """ + super().__init__(logger, dict(agent_name=agent_name)) + + def process( + self, msg: Any, kwargs: MutableMapping[str, Any] + ) -> Tuple[Any, MutableMapping[str, Any]]: + """Prepend the agent name to every log message.""" + return f"[{self.extra['agent_name']}] {msg}", kwargs + + +class WithLogger: + """Interface to endow subclasses with a logger.""" + + def __init__( + self, + logger: Optional[Union[Logger, LoggerAdapter]] = None, + default_logger_name: str = "aea", + ): + """ + Initialize the logger. + + :param logger: the logger object. + :param default_logger_name: the default logger name, if a logger is not provided. + """ + self._logger = logger + self._default_logger_name = default_logger_name + + @property + def logger(self) -> Union[Logger, LoggerAdapter]: + """Get the component logger.""" + if self._logger is None: + # if not set (e.g. programmatic instantiation) + # return a default one with the default logger name. + return logging.getLogger(self._default_logger_name) + return self._logger + + @logger.setter + def logger(self, logger: Union[Logger, LoggerAdapter]): + """Set the logger.""" + self._logger = logger diff --git a/aea/multiplexer.py b/aea/multiplexer.py index b274de8b32..520a7f2f54 100644 --- a/aea/multiplexer.py +++ b/aea/multiplexer.py @@ -28,18 +28,19 @@ from aea.connections.base import Connection, ConnectionStatus from aea.helpers.async_friendly_queue import AsyncFriendlyQueue from aea.helpers.async_utils import ThreadedAsyncRunner, cancel_and_wait +from aea.helpers.logging import WithLogger from aea.mail.base import ( AEAConnectionError, Address, Empty, Envelope, EnvelopeContext, - logger, + logger as default_logger, ) from aea.protocols.base import Message -class AsyncMultiplexer: +class AsyncMultiplexer(WithLogger): """This class can handle multiple connections at once.""" def __init__( @@ -56,7 +57,9 @@ def __init__( This information is used for envelopes which don't specify any routing context. If connections is None, this parameter is ignored. :param loop: the event loop to run the multiplexer. If None, a new event loop is created. + :param agent_name: the name of the agent that owns the multiplexer, for logging purposes. """ + super().__init__(default_logger) self._connections: List[Connection] = [] self._id_to_connection: Dict[PublicId, Connection] = {} self._default_connection: Optional[Connection] = None @@ -107,7 +110,7 @@ def add_connection(self, connection: Connection, is_default: bool = False) -> No :return: None """ if connection.connection_id in self._id_to_connection: - logger.warning( + self.logger.warning( f"A connection with id {connection.connection_id} was already added. Replacing it..." ) @@ -174,13 +177,13 @@ def connection_status(self) -> ConnectionStatus: async def connect(self) -> None: """Connect the multiplexer.""" - logger.debug("Multiplexer connecting...") + self.logger.debug("Multiplexer connecting...") self._connection_consistency_checks() self._set_default_connection_if_none() self._out_queue = asyncio.Queue() async with self._lock: if self.connection_status.is_connected: - logger.debug("Multiplexer already connected.") + self.logger.debug("Multiplexer already connected.") return try: await self._connect_all() @@ -188,28 +191,28 @@ async def connect(self) -> None: self._connection_status.is_connected = True self._recv_loop_task = self._loop.create_task(self._receiving_loop()) self._send_loop_task = self._loop.create_task(self._send_loop()) - logger.debug("Multiplexer connected and running.") + self.logger.debug("Multiplexer connected and running.") except (CancelledError, Exception): - logger.exception("Exception on connect:") + self.logger.exception("Exception on connect:") self._connection_status.is_connected = False await self._stop() raise AEAConnectionError("Failed to connect the multiplexer.") async def disconnect(self) -> None: """Disconnect the multiplexer.""" - logger.debug("Multiplexer disconnecting...") + self.logger.debug("Multiplexer disconnecting...") async with self._lock: if not self.connection_status.is_connected: - logger.debug("Multiplexer already disconnected.") + self.logger.debug("Multiplexer already disconnected.") await asyncio.wait_for(self._stop(), timeout=60) return try: await asyncio.wait_for(self._disconnect_all(), timeout=60) await asyncio.wait_for(self._stop(), timeout=60) self._connection_status.is_connected = False - logger.debug("Multiplexer disconnected.") + self.logger.debug("Multiplexer disconnected.") except (CancelledError, Exception): - logger.exception("Exception on disconnect:") + self.logger.exception("Exception on disconnect:") raise AEAConnectionError("Failed to disconnect the multiplexer.") async def _stop(self) -> None: @@ -219,7 +222,7 @@ async def _stop(self) -> None: Stops recv and send loops. Disconnect every connection. """ - logger.debug("Stopping multiplexer...") + self.logger.debug("Stopping multiplexer...") await cancel_and_wait(self._recv_loop_task) self._recv_loop_task = None @@ -235,18 +238,18 @@ async def _stop(self) -> None: if c.connection_status.is_connected or c.connection_status.is_connecting ]: await connection.disconnect() - logger.debug("Multiplexer stopped.") + self.logger.debug("Multiplexer stopped.") async def _connect_all(self) -> None: """Set all the connection up.""" - logger.debug("Starting multiplexer connections.") + self.logger.debug("Starting multiplexer connections.") connected = [] # type: List[PublicId] for connection_id, connection in self._id_to_connection.items(): try: await self._connect_one(connection_id) connected.append(connection_id) except Exception as e: # pylint: disable=broad-except - logger.error( + self.logger.error( "Error while connecting {}: {}".format( str(type(connection)), str(e) ) @@ -254,7 +257,7 @@ async def _connect_all(self) -> None: for c in connected: await self._disconnect_one(c) break - logger.debug("Multiplexer connections are set.") + self.logger.debug("Multiplexer connections are set.") async def _connect_one(self, connection_id: PublicId) -> None: """ @@ -264,15 +267,15 @@ async def _connect_one(self, connection_id: PublicId) -> None: :return: None """ connection = self._id_to_connection[connection_id] - logger.debug("Processing connection {}".format(connection.connection_id)) + self.logger.debug("Processing connection {}".format(connection.connection_id)) if connection.connection_status.is_connected: - logger.debug( + self.logger.debug( "Connection {} already established.".format(connection.connection_id) ) else: connection.loop = self._loop await connection.connect() - logger.debug( + self.logger.debug( "Connection {} has been set up successfully.".format( connection.connection_id ) @@ -280,12 +283,12 @@ async def _connect_one(self, connection_id: PublicId) -> None: async def _disconnect_all(self) -> None: """Tear all the connections down.""" - logger.debug("Tear the multiplexer connections down.") + self.logger.debug("Tear the multiplexer connections down.") for connection_id, connection in self._id_to_connection.items(): try: await self._disconnect_one(connection_id) except Exception as e: # pylint: disable=broad-except - logger.error( + self.logger.error( "Error while disconnecting {}: {}".format( str(type(connection)), str(e) ) @@ -299,14 +302,14 @@ async def _disconnect_one(self, connection_id: PublicId) -> None: :return: None """ connection = self._id_to_connection[connection_id] - logger.debug("Processing connection {}".format(connection.connection_id)) + self.logger.debug("Processing connection {}".format(connection.connection_id)) if not connection.connection_status.is_connected: - logger.debug( + self.logger.debug( "Connection {} already disconnected.".format(connection.connection_id) ) else: await connection.disconnect() - logger.debug( + self.logger.debug( "Connection {} has been disconnected successfully.".format( connection.connection_id ) @@ -315,39 +318,41 @@ async def _disconnect_one(self, connection_id: PublicId) -> None: async def _send_loop(self) -> None: """Process the outgoing envelopes.""" if not self.is_connected: - logger.debug("Sending loop not started. The multiplexer is not connected.") + self.logger.debug( + "Sending loop not started. The multiplexer is not connected." + ) return while self.is_connected: try: - logger.debug("Waiting for outgoing envelopes...") + self.logger.debug("Waiting for outgoing envelopes...") envelope = await self.out_queue.get() if envelope is None: - logger.debug( + self.logger.debug( "Received empty envelope. Quitting the sending loop..." ) return None - logger.debug("Sending envelope {}".format(str(envelope))) + self.logger.debug("Sending envelope {}".format(str(envelope))) await self._send(envelope) except asyncio.CancelledError: - logger.debug("Sending loop cancelled.") + self.logger.debug("Sending loop cancelled.") return except AEAConnectionError as e: - logger.error(str(e)) + self.logger.error(str(e)) except Exception as e: # pylint: disable=broad-except - logger.error("Error in the sending loop: {}".format(str(e))) + self.logger.error("Error in the sending loop: {}".format(str(e))) return async def _receiving_loop(self) -> None: """Process incoming envelopes.""" - logger.debug("Starting receving loop...") + self.logger.debug("Starting receving loop...") task_to_connection = { asyncio.ensure_future(conn.receive()): conn for conn in self.connections } while self.connection_status.is_connected and len(task_to_connection) > 0: try: - # logger.debug("Waiting for incoming envelopes...") + # self.self.logger.debug("Waiting for incoming envelopes...") done, _pending = await asyncio.wait( task_to_connection.keys(), return_when=asyncio.FIRST_COMPLETED ) @@ -365,17 +370,17 @@ async def _receiving_loop(self) -> None: task_to_connection[new_task] = connection except asyncio.CancelledError: - logger.debug("Receiving loop cancelled.") + self.logger.debug("Receiving loop cancelled.") break except Exception as e: # pylint: disable=broad-except - logger.error("Error in the receiving loop: {}".format(str(e))) - logger.exception("Error in the receiving loop: {}".format(str(e))) + self.logger.error("Error in the receiving loop: {}".format(str(e))) + self.logger.exception("Error in the receiving loop: {}".format(str(e))) break # cancel all the receiving tasks. for t in task_to_connection.keys(): t.cancel() - logger.debug("Receiving loop terminated.") + self.logger.debug("Receiving loop terminated.") async def _send(self, envelope: Envelope) -> None: """ @@ -395,7 +400,7 @@ async def _send(self, envelope: Envelope) -> None: # second, try to route by default routing if connection_id is None and envelope.protocol_id in self.default_routing: connection_id = self.default_routing[envelope.protocol_id] - logger.debug("Using default routing: {}".format(connection_id)) + self.logger.debug("Using default routing: {}".format(connection_id)) if connection_id is not None and connection_id not in self._id_to_connection: raise AEAConnectionError( @@ -403,7 +408,9 @@ async def _send(self, envelope: Envelope) -> None: ) if connection_id is None: - logger.debug("Using default connection: {}".format(self.default_connection)) + self.logger.debug( + "Using default connection: {}".format(self.default_connection) + ) connection = self.default_connection else: connection = self._id_to_connection[connection_id] @@ -413,7 +420,7 @@ async def _send(self, envelope: Envelope) -> None: len(connection.restricted_to_protocols) > 0 and envelope.protocol_id not in connection.restricted_to_protocols ): - logger.warning( + self.logger.warning( "Connection {} cannot handle protocol {}. Cannot send the envelope.".format( connection.connection_id, envelope.protocol_id ) @@ -532,7 +539,7 @@ def disconnect(self) -> None: # type: ignore # cause overrides coroutine # pyli Also stops a dedicated thread for event loop if spawned on connect. """ - logger.debug("Disconnect called") + self.logger.debug("Disconnect called") with self._sync_lock: if not self._loop.is_running(): return @@ -540,12 +547,12 @@ def disconnect(self) -> None: # type: ignore # cause overrides coroutine # pyli if self._is_connected: self._thread_runner.call(super().disconnect()).result(240) self._is_connected = False - logger.debug("Disconnect async method executed") + self.logger.debug("Disconnect async method executed") if self._thread_runner.is_alive() and self._thread_was_started: self._thread_runner.stop() - logger.debug("Thread stopped") - logger.debug("Disconnected") + self.logger.debug("Thread stopped") + self.logger.debug("Disconnected") def put(self, envelope: Envelope) -> None: # type: ignore # cause overrides coroutine """ @@ -609,11 +616,11 @@ def get(self, block: bool = False, timeout: Optional[float] = None) -> Envelope: :return: the envelope object. :raises Empty: if the attempt to get an envelope fails. """ - logger.debug("Checks for envelope from the in queue...") + self._multiplexer.logger.debug("Checks for envelope from the in queue...") envelope = self._multiplexer.get(block=block, timeout=timeout) if envelope is None: raise Empty() - logger.debug( + self._multiplexer.logger.debug( "Incoming envelope: to='{}' sender='{}' protocol_id='{}' message='{!r}'".format( envelope.to, envelope.sender, envelope.protocol_id, envelope.message ) @@ -638,11 +645,13 @@ async def async_get(self) -> Envelope: :return: the envelope object. """ - logger.debug("Checks for envelope from the in queue async way...") + self._multiplexer.logger.debug( + "Checks for envelope from the in queue async way..." + ) envelope = await self._multiplexer.async_get() if envelope is None: raise Empty() - logger.debug( + self._multiplexer.logger.debug( "Incoming envelope: to='{}' sender='{}' protocol_id='{}' message='{!r}'".format( envelope.to, envelope.sender, envelope.protocol_id, envelope.message ) @@ -655,7 +664,9 @@ async def async_wait(self) -> None: :return: the envelope object. """ - logger.debug("Checks for envelope presents in queue async way...") + self._multiplexer.logger.debug( + "Checks for envelope presents in queue async way..." + ) await self._multiplexer.async_wait() @@ -688,7 +699,7 @@ def put(self, envelope: Envelope) -> None: :param envelope: the envelope. :return: None """ - logger.debug( + self._multiplexer.logger.debug( "Put an envelope in the queue: to='{}' sender='{}' protocol_id='{}' message='{!r}' context='{}'...".format( envelope.to, envelope.sender, diff --git a/aea/registries/base.py b/aea/registries/base.py index e8bdd950ec..3e162fdb6b 100644 --- a/aea/registries/base.py +++ b/aea/registries/base.py @@ -32,6 +32,7 @@ PublicId, SkillId, ) +from aea.helpers.logging import WithLogger from aea.skills.base import Behaviour, Handler, Model logger = logging.getLogger(__name__) @@ -41,9 +42,13 @@ SkillComponentType = TypeVar("SkillComponentType", Handler, Behaviour, Model) -class Registry(Generic[ItemId, Item], ABC): +class Registry(Generic[ItemId, Item], WithLogger, ABC): """This class implements an abstract registry.""" + def __init__(self): + """Initialize the registry.""" + super().__init__(logger) + @abstractmethod def register(self, item_id: ItemId, item: Item) -> None: """ @@ -108,6 +113,7 @@ def __init__(self) -> None: :return: None """ + super().__init__() self._components_by_type: Dict[ComponentType, Dict[PublicId, Component]] = {} self._registered_keys: Set[ComponentId] = set() @@ -157,7 +163,9 @@ def _unregister(self, component_id: ComponentId) -> None: ) self._registered_keys.discard(component_id) if item is not None: - logger.debug("Component '{}' has been removed.".format(item.component_id)) + self.logger.debug( + "Component '{}' has been removed.".format(item.component_id) + ) def unregister( self, component_id: ComponentId @@ -236,6 +244,7 @@ def __init__(self) -> None: :return: None """ + super().__init__() self._items = {} # type: Dict[SkillId, Dict[str, SkillComponentType]] def register(self, item_id: Tuple[SkillId, str], item: SkillComponentType) -> None: @@ -272,7 +281,7 @@ def unregister(self, item_id: Tuple[SkillId, str]) -> None: raise ValueError( "No item registered with component id '{}'".format(item_id) ) - logger.debug("Unregistering item with id {}".format(item_id)) + self.logger.debug("Unregistering item with id {}".format(item_id)) name_to_item.pop(item_name) if len(name_to_item) == 0: @@ -315,14 +324,14 @@ def setup(self) -> None: """ for item in self.fetch_all(): if item.context.is_active: - logger.debug( + self.logger.debug( "Calling setup() of component {} of skill {}".format( item.name, item.skill_id ) ) item.setup() else: - logger.debug( + self.logger.debug( "Ignoring setup() of component {} of skill {}, because the skill is not active.".format( item.name, item.skill_id ) @@ -339,7 +348,7 @@ def teardown(self) -> None: try: item.teardown() except Exception as e: # pragma: nocover # pylint: disable=broad-except - logger.warning( + self.logger.warning( "An error occurred while tearing down item {}/{}: {}".format( skill_id, type(item).__name__, str(e) ) @@ -411,7 +420,9 @@ def unregister(self, item_id: Tuple[SkillId, str]) -> None: raise ValueError( "No item registered with component id '{}'".format(item_id) ) - logger.debug("Unregistering item with id {}".format(item_id)) + self.logger.debug( # pylint: disable=no-member + "Unregistering item with id {}".format(item_id) + ) handler = name_to_item.pop(item_name) if len(name_to_item) == 0: diff --git a/aea/registries/resources.py b/aea/registries/resources.py index 86fa887103..2c0d55c241 100644 --- a/aea/registries/resources.py +++ b/aea/registries/resources.py @@ -63,6 +63,11 @@ def __init__(self) -> None: self._model_registry, ] # type: List[Registry] + @property + def component_registry(self) -> AgentComponentRegistry: + """Get the agent component registry.""" + return self._component_registry + @property def behaviour_registry(self) -> ComponentRegistry[Behaviour]: """Get the behaviour registry.""" @@ -73,6 +78,11 @@ def handler_registry(self) -> HandlerRegistry: """Get the handler registry.""" return self._handler_registry + @property + def model_registry(self) -> ComponentRegistry[Model]: + """Get the model registry.""" + return self._model_registry + def add_component(self, component: Component): """Add a component to resources.""" if component.component_type == ComponentType.PROTOCOL: diff --git a/aea/skills/base.py b/aea/skills/base.py index 5dea68ef4d..f651a75cc7 100644 --- a/aea/skills/base.py +++ b/aea/skills/base.py @@ -25,11 +25,11 @@ import queue import re from abc import ABC, abstractmethod -from logging import Logger +from logging import Logger, LoggerAdapter from pathlib import Path from queue import Queue from types import SimpleNamespace -from typing import Any, Dict, Optional, Sequence, Set, Tuple, Type, cast +from typing import Any, Dict, Optional, Sequence, Set, Tuple, Type, Union, cast from aea.components.base import Component from aea.configurations.base import ( @@ -45,6 +45,7 @@ from aea.contracts.base import Contract from aea.exceptions import AEAException from aea.helpers.base import load_aea_package, load_module +from aea.helpers.logging import AgentLoggerAdapter from aea.mail.base import Address from aea.multiplexer import OutBox from aea.protocols.base import Message @@ -73,16 +74,16 @@ def __init__( self._is_active = True # type: bool self._new_behaviours_queue = queue.Queue() # type: Queue - self._logger = None # type: Optional[Logger] + self._logger: Optional[Union[Logger, LoggerAdapter]] = None @property - def logger(self) -> Logger: + def logger(self) -> Union[Logger, LoggerAdapter]: """Get the logger.""" assert self._logger is not None, "Logger not set." return self._logger @logger.setter - def logger(self, logger_: Logger) -> None: + def logger(self, logger_: Union[Logger, AgentLoggerAdapter]) -> None: assert self._logger is None, "Logger already set." self._logger = logger_ @@ -683,6 +684,21 @@ def from_dir(cls, directory: str, agent_context: AgentContext) -> "Skill": configuration.directory = Path(directory) return Skill.from_config(configuration, agent_context) + @property + def logger(self) -> Union[Logger, LoggerAdapter]: + """ + Get the logger. + + In the case of a skill, return the + logger provided by the skill context. + """ + return self.skill_context.logger + + @logger.setter + def logger(self, *args) -> None: + """Set the logger.""" + raise ValueError("Cannot set logger to a skill component..") + @classmethod def from_config( cls, configuration: SkillConfig, agent_context: AgentContext @@ -704,7 +720,10 @@ def from_config( skill_context = SkillContext() skill_context.set_agent_context(agent_context) logger_name = f"aea.packages.{configuration.author}.skills.{configuration.name}" - skill_context.logger = logging.getLogger(logger_name) + logger = AgentLoggerAdapter( + logging.getLogger(logger_name), agent_context.agent_name + ) + skill_context.logger = logger skill = Skill(configuration, skill_context) diff --git a/aea/skills/tasks.py b/aea/skills/tasks.py index 5f68adebcb..50a0e5a15b 100644 --- a/aea/skills/tasks.py +++ b/aea/skills/tasks.py @@ -25,6 +25,8 @@ from multiprocessing.pool import AsyncResult, Pool from typing import Any, Callable, Dict, Optional, Sequence, cast +from aea.helpers.logging import WithLogger + logger = logging.getLogger(__name__) @@ -120,7 +122,7 @@ def init_worker() -> None: # signal.signal(signal.CTRL_C_EVENT, signal.SIG_IGN) -class TaskManager: +class TaskManager(WithLogger): """A Task manager.""" def __init__(self, nb_workers: int = 1, is_lazy_pool_start: bool = True): @@ -130,6 +132,7 @@ def __init__(self, nb_workers: int = 1, is_lazy_pool_start: bool = True): :param nb_workers: the number of worker processes. :param is_lazy_pool_start: option to postpone pool creation till the first enqueue_task called. """ + WithLogger.__init__(self, logger) self._nb_workers = nb_workers self._is_lazy_pool_start = is_lazy_pool_start self._pool = None # type: Optional[Pool] @@ -207,9 +210,9 @@ def start(self) -> None: """ with self._lock: if self._stopped is False: - logger.debug("Task manager already running.") + self.logger.debug("Task manager already running.") else: - logger.debug("Start the task manager.") + self.logger.debug("Start the task manager.") self._stopped = False if not self._is_lazy_pool_start: self._start_pool() @@ -222,9 +225,9 @@ def stop(self) -> None: """ with self._lock: if self._stopped is True: - logger.debug("Task manager already stopped.") + self.logger.debug("Task manager already stopped.") else: - logger.debug("Stop the task manager.") + self.logger.debug("Stop the task manager.") self._stopped = True self._stop_pool() @@ -237,7 +240,7 @@ def _start_pool(self) -> None: :return: None """ if self._pool: - logger.debug("Pool was already started!.") + self.logger.debug("Pool was already started!.") return self._pool = Pool(self._nb_workers, initializer=init_worker) @@ -248,7 +251,7 @@ def _stop_pool(self) -> None: :return: None """ if not self._pool: - logger.debug("Pool is not started!.") + self.logger.debug("Pool is not started!.") return self._pool = cast(Pool, self._pool) diff --git a/docs/aries-cloud-agent-demo.md b/docs/aries-cloud-agent-demo.md index 42eba9f067..0c5697a03b 100644 --- a/docs/aries-cloud-agent-demo.md +++ b/docs/aries-cloud-agent-demo.md @@ -187,8 +187,8 @@ aea config set --type int vendor.fetchai.skills.aries_alice.handlers.aries_demo_ Add `http_client`, `oef` and `webhook` connections: ``` bash -aea add connection fetchai/http_client:0.4.0 -aea add connection fetchai/webhook:0.3.0 +aea add connection fetchai/http_client:0.5.0 +aea add connection fetchai/webhook:0.4.0 aea add connection fetchai/oef:0.6.0 ``` @@ -323,8 +323,8 @@ aea config set vendor.fetchai.skills.aries_faber.handlers.aries_demo_http.args.a Add `http_client`, `oef` and `webhook` connections: ``` bash -aea add connection fetchai/http_client:0.4.0 -aea add connection fetchai/webhook:0.3.0 +aea add connection fetchai/http_client:0.5.0 +aea add connection fetchai/webhook:0.4.0 aea add connection fetchai/oef:0.6.0 ``` @@ -347,7 +347,7 @@ aea config set vendor.fetchai.connections.webhook.config.webhook_url_path /webho Now you must ensure **Faber_AEA**'s default connection is `http_client`. ``` bash -aea config set agent.default_connection fetchai/http_client:0.4.0 +aea config set agent.default_connection fetchai/http_client:0.5.0 ``` ### Alice_AEA -- Method 2: Fetch the Agent diff --git a/docs/connect-a-frontend.md b/docs/connect-a-frontend.md index 527835c462..5659b9d3d3 100644 --- a/docs/connect-a-frontend.md +++ b/docs/connect-a-frontend.md @@ -3,7 +3,7 @@ This demo discusses the options we have to connect a front-end to the AEA. The f How to connect frontend to your AEA ## Case 1 -The first option we have is to create a `Connection` that will handle the incoming requests from the rest API. In this scenario, the rest API communicates with the AEA and requests are handled by the `HTTP Server` Connection package. The rest API should send CRUD requests to the `HTTP Server` Connection (`fetchai/http_server:0.4.0`) which translates these into Envelopes to be consumed by the correct skill. +The first option we have is to create a `Connection` that will handle the incoming requests from the rest API. In this scenario, the rest API communicates with the AEA and requests are handled by the `HTTP Server` Connection package. The rest API should send CRUD requests to the `HTTP Server` Connection (`fetchai/http_server:0.5.0`) which translates these into Envelopes to be consumed by the correct skill. ## Case 2 The other option we have is to create a stand-alone `Multiplexer` with an `OEF` connection (`fetchai/oef:0.6.0`). In this scenario, the front-end needs to incorporate a Multiplexer with an `OEF` Connection. Then the [OEF communication node](../oef-ledger) can be used to send Envelopes from the AEA to the front-end. diff --git a/docs/http-connection-and-skill.md b/docs/http-connection-and-skill.md index 24d20506b0..efd2f4c806 100644 --- a/docs/http-connection-and-skill.md +++ b/docs/http-connection-and-skill.md @@ -14,13 +14,13 @@ cd my_aea Add the http server connection package ``` bash -aea add connection fetchai/http_server:0.4.0 +aea add connection fetchai/http_server:0.5.0 ``` Update the default connection: ``` bash -aea config set agent.default_connection fetchai/http_server:0.4.0 +aea config set agent.default_connection fetchai/http_server:0.5.0 ``` Modify the `api_spec_path`: diff --git a/packages/fetchai/agents/aries_alice/aea-config.yaml b/packages/fetchai/agents/aries_alice/aea-config.yaml index 4b4dcda0eb..e65c57d446 100644 --- a/packages/fetchai/agents/aries_alice/aea-config.yaml +++ b/packages/fetchai/agents/aries_alice/aea-config.yaml @@ -7,10 +7,10 @@ aea_version: '>=0.5.0, <0.6.0' fingerprint: {} fingerprint_ignore_patterns: [] connections: -- fetchai/http_client:0.4.0 +- fetchai/http_client:0.5.0 - fetchai/oef:0.6.0 - fetchai/stub:0.6.0 -- fetchai/webhook:0.3.0 +- fetchai/webhook:0.4.0 contracts: [] protocols: - fetchai/default:0.3.0 diff --git a/packages/fetchai/agents/aries_faber/aea-config.yaml b/packages/fetchai/agents/aries_faber/aea-config.yaml index fe8f7b948f..7c527e099f 100644 --- a/packages/fetchai/agents/aries_faber/aea-config.yaml +++ b/packages/fetchai/agents/aries_faber/aea-config.yaml @@ -7,10 +7,10 @@ aea_version: '>=0.5.0, <0.6.0' fingerprint: {} fingerprint_ignore_patterns: [] connections: -- fetchai/http_client:0.4.0 +- fetchai/http_client:0.5.0 - fetchai/oef:0.6.0 - fetchai/stub:0.6.0 -- fetchai/webhook:0.3.0 +- fetchai/webhook:0.4.0 contracts: [] protocols: - fetchai/default:0.3.0 @@ -20,7 +20,7 @@ protocols: skills: - fetchai/aries_faber:0.3.0 - fetchai/error:0.3.0 -default_connection: fetchai/http_client:0.4.0 +default_connection: fetchai/http_client:0.5.0 default_ledger: fetchai ledger_apis: {} logging_config: diff --git a/packages/fetchai/connections/gym/connection.py b/packages/fetchai/connections/gym/connection.py index 9ec55d4115..3b53544934 100644 --- a/packages/fetchai/connections/gym/connection.py +++ b/packages/fetchai/connections/gym/connection.py @@ -24,7 +24,7 @@ from asyncio import CancelledError from asyncio.events import AbstractEventLoop from concurrent.futures.thread import ThreadPoolExecutor -from typing import Optional, cast +from typing import Optional, Union, cast import gym @@ -57,6 +57,7 @@ def __init__(self, address: Address, gym_env: gym.Env): self._threaded_pool: ThreadPoolExecutor = ThreadPoolExecutor( self.THREAD_POOL_SIZE ) + self.logger: Union[logging.Logger, logging.LoggerAdapter] = logger @property def queue(self) -> asyncio.Queue: @@ -83,7 +84,7 @@ async def send(self, envelope: Envelope) -> None: :return: None """ sender = envelope.sender - logger.debug("Processing message from {}: {}".format(sender, envelope)) + self.logger.debug("Processing message from {}: {}".format(sender, envelope)) if envelope.protocol_id != GymMessage.protocol_id: raise ValueError("This protocol is not valid for gym.") await self.handle_gym_message(envelope) @@ -183,6 +184,7 @@ async def connect(self) -> None: """ if not self.connection_status.is_connected: self.connection_status.is_connected = True + self.channel.logger = self.logger await self.channel.connect() async def disconnect(self) -> None: diff --git a/packages/fetchai/connections/gym/connection.yaml b/packages/fetchai/connections/gym/connection.yaml index 10f02515ae..7515f475b5 100644 --- a/packages/fetchai/connections/gym/connection.yaml +++ b/packages/fetchai/connections/gym/connection.yaml @@ -6,7 +6,7 @@ license: Apache-2.0 aea_version: '>=0.5.0, <0.6.0' fingerprint: __init__.py: QmWwxj1hGGZNteCvRtZxwtY9PuEKsrWsEmMWCKwiYCdvRR - connection.py: QmV2REDadG36ogXD3eW4Ms82gUfWdjAQJcNJ6ik48P1CC4 + connection.py: QmZHUedJDmV2X1kXcjjyZHwWbwV3553QEKSUYcK6NTtr4F fingerprint_ignore_patterns: [] protocols: - fetchai/gym:0.3.0 diff --git a/packages/fetchai/connections/http_client/connection.py b/packages/fetchai/connections/http_client/connection.py index 7d18ea8b4d..ce7f7f20ee 100644 --- a/packages/fetchai/connections/http_client/connection.py +++ b/packages/fetchai/connections/http_client/connection.py @@ -40,7 +40,7 @@ NOT_FOUND = 404 REQUEST_TIMEOUT = 408 SERVER_ERROR = 500 -PUBLIC_ID = PublicId.from_str("fetchai/http_client:0.4.0") +PUBLIC_ID = PublicId.from_str("fetchai/http_client:0.5.0") logger = logging.getLogger("aea.packages.fetchai.connections.http_client") @@ -85,9 +85,11 @@ def __init__( ) # type: Optional[asyncio.AbstractEventLoop] # pragma: no cover self.excluded_protocols = excluded_protocols self.is_stopped = True - logger.info("Initialised the HTTP client channel") self._tasks: Set[Task] = set() + self.logger = logger + self.logger.info("Initialised the HTTP client channel") + async def connect(self, loop: AbstractEventLoop) -> None: """ Connect channel using loop. @@ -160,7 +162,7 @@ async def _perform_http_request( await resp.read() return resp except Exception: # pragma: nocover # pylint: disable=broad-except - logger.exception( + self.logger.exception( f"Exception raised during http call: {request_http_message.method} {request_http_message.url}" ) raise @@ -186,7 +188,7 @@ def send(self, request_envelope: Envelope) -> None: return if request_envelope.protocol_id in (self.excluded_protocols or []): - logger.error( + self.logger.error( "This envelope cannot be sent with the http client connection: protocol_id={}".format( request_envelope.protocol_id ) @@ -202,7 +204,7 @@ def send(self, request_envelope: Envelope) -> None: if ( request_http_message.performative != HttpMessage.Performative.REQUEST ): # pragma: nocover - logger.warning( + self.logger.warning( "The HTTPMessage performative must be a REQUEST. Envelop dropped." ) return @@ -222,7 +224,7 @@ def _task_done_callback(self, task: Task) -> None: :return: None """ self._tasks.remove(task) - logger.debug(f"Task completed: {task}") + self.logger.debug(f"Task completed: {task}") async def get_message(self) -> Union["Envelope", None]: """ @@ -298,7 +300,7 @@ async def _cancel_tasks(self) -> None: async def disconnect(self) -> None: """Disconnect.""" if not self.is_stopped: - logger.info("HTTP Client has shutdown on port: {}.".format(self.port)) + self.logger.info("HTTP Client has shutdown on port: {}.".format(self.port)) self.is_stopped = True await self._cancel_tasks() @@ -331,6 +333,7 @@ async def connect(self) -> None: """ if not self.connection_status.is_connected: self.connection_status.is_connected = True + self.channel.logger = self.logger await self.channel.connect(self._loop) async def disconnect(self) -> None: @@ -369,5 +372,5 @@ async def receive(self, *args, **kwargs) -> Optional[Union["Envelope", None]]: try: return await self.channel.get_message() except Exception: # pragma: nocover # pylint: disable=broad-except - logger.exception("Exception on receive") + self.logger.exception("Exception on receive") return None diff --git a/packages/fetchai/connections/http_client/connection.yaml b/packages/fetchai/connections/http_client/connection.yaml index 6b882aca04..5c05c1312c 100644 --- a/packages/fetchai/connections/http_client/connection.yaml +++ b/packages/fetchai/connections/http_client/connection.yaml @@ -1,13 +1,13 @@ name: http_client author: fetchai -version: 0.4.0 +version: 0.5.0 description: The HTTP_client connection that wraps a web-based client connecting to a RESTful API specification. license: Apache-2.0 aea_version: '>=0.5.0, <0.6.0' fingerprint: __init__.py: QmPdKAks8A6XKAgZiopJzPZYXJumTeUqChd8UorqmLQQPU - connection.py: QmancYRcofdt3wSti4RymqTNWYbLtnbjxKYpB4z2LERrWd + connection.py: QmVYurcnjuRTK6CnuEc6qNbSykmZEzRMkjyGhknJKzKRQt fingerprint_ignore_patterns: [] protocols: - fetchai/http:0.3.0 diff --git a/packages/fetchai/connections/http_server/connection.py b/packages/fetchai/connections/http_server/connection.py index 5d4568bbc7..496443ed43 100644 --- a/packages/fetchai/connections/http_server/connection.py +++ b/packages/fetchai/connections/http_server/connection.py @@ -67,7 +67,7 @@ logger = logging.getLogger("aea.packages.fetchai.connections.http_server") RequestId = str -PUBLIC_ID = PublicId.from_str("fetchai/http_server:0.4.0") +PUBLIC_ID = PublicId.from_str("fetchai/http_server:0.5.0") def headers_to_string(headers: Dict): @@ -348,6 +348,8 @@ def __init__( self.http_server: Optional[web.TCPSite] = None self.pending_requests: Dict[RequestId, Future] = {} + self.logger = logger + @property def api_spec(self) -> APISpec: """Get the api spec.""" @@ -368,11 +370,13 @@ async def connect(self, loop: AbstractEventLoop) -> None: try: await self._start_http_server() - logger.info("HTTP Server has connected to port: {}.".format(self.port)) + self.logger.info( + "HTTP Server has connected to port: {}.".format(self.port) + ) except Exception: # pragma: nocover # pylint: disable=broad-except self.is_stopped = True self._in_queue = None - logger.exception( + self.logger.exception( "Failed to start server on {}:{}.".format(self.host, self.port) ) @@ -390,7 +394,7 @@ async def _http_handler(self, http_request: BaseRequest) -> Response: is_valid_request = self.api_spec.verify(request) if not is_valid_request: - logger.warning(f"request is not valid: {request}") + self.logger.warning(f"request is not valid: {request}") return Response(status=NOT_FOUND, reason="Request Not Found") try: @@ -433,7 +437,7 @@ def send(self, envelope: Envelope) -> None: assert self.http_server is not None, "Server not connected, call connect first!" if envelope.protocol_id not in self.restricted_to_protocols: - logger.error( + self.logger.error( "This envelope cannot be sent with the http connection: protocol_id={}".format( envelope.protocol_id ) @@ -443,7 +447,7 @@ def send(self, envelope: Envelope) -> None: future = self.pending_requests.pop(envelope.to, None) if not future: - logger.warning( + self.logger.warning( "Dropping envelope for request id {} which has timed out.".format( envelope.to ) @@ -461,7 +465,7 @@ async def disconnect(self) -> None: if not self.is_stopped: await self.http_server.stop() - logger.info("HTTP Server has shutdown on port: {}.".format(self.port)) + self.logger.info("HTTP Server has shutdown on port: {}.".format(self.port)) self.is_stopped = True self._in_queue = None @@ -494,6 +498,7 @@ async def connect(self) -> None: :return: None """ if not self.connection_status.is_connected: + self.channel.logger = self.logger await self.channel.connect(loop=self.loop) self.connection_status.is_connected = not self.channel.is_stopped diff --git a/packages/fetchai/connections/http_server/connection.yaml b/packages/fetchai/connections/http_server/connection.yaml index 308c398eb5..e020af1227 100644 --- a/packages/fetchai/connections/http_server/connection.yaml +++ b/packages/fetchai/connections/http_server/connection.yaml @@ -1,13 +1,13 @@ name: http_server author: fetchai -version: 0.4.0 +version: 0.5.0 description: The HTTP server connection that wraps http server implementing a RESTful API specification. license: Apache-2.0 aea_version: '>=0.5.0, <0.6.0' fingerprint: __init__.py: Qmb6JEAkJeb5JweqrSGiGoQp1vGXqddjGgb9WMkm2phTgA - connection.py: Qmf1GFFhq4LQXLGizrp6nMDy4R7XRoqEayzqaEaxuToVnu + connection.py: QmTDwwg4Qah191WaiFizdhGGDs56jha26NWcjGkmDTDt5q fingerprint_ignore_patterns: [] protocols: - fetchai/http:0.3.0 diff --git a/packages/fetchai/connections/local/connection.py b/packages/fetchai/connections/local/connection.py index 5d2a0a90e5..dc82107b24 100644 --- a/packages/fetchai/connections/local/connection.py +++ b/packages/fetchai/connections/local/connection.py @@ -42,7 +42,7 @@ RESPONSE_MESSAGE_ID = MESSAGE_ID + 1 STUB_DIALOGUE_ID = 0 DEFAULT_OEF = "default_oef" -PUBLIC_ID = PublicId.from_str("fetchai/local:0.3.0") +PUBLIC_ID = PublicId.from_str("fetchai/local:0.4.0") class LocalNode: @@ -368,9 +368,9 @@ async def receive(self, *args, **kwargs) -> Optional["Envelope"]: assert self._reader is not None envelope = await self._reader.get() if envelope is None: - logger.debug("Receiving task terminated.") + self.logger.debug("Receiving task terminated.") return None - logger.debug("Received envelope {}".format(envelope)) + self.logger.debug("Received envelope {}".format(envelope)) return envelope except Exception: # pragma: nocover # pylint: disable=broad-except return None diff --git a/packages/fetchai/connections/local/connection.yaml b/packages/fetchai/connections/local/connection.yaml index 7b28ee2632..57c124dd66 100644 --- a/packages/fetchai/connections/local/connection.yaml +++ b/packages/fetchai/connections/local/connection.yaml @@ -1,12 +1,12 @@ name: local author: fetchai -version: 0.3.0 +version: 0.4.0 description: The local connection provides a stub for an OEF node. license: Apache-2.0 aea_version: '>=0.5.0, <0.6.0' fingerprint: __init__.py: QmeeoX5E38Ecrb1rLdeFyyxReHLrcJoETnBcPbcNWVbiKG - connection.py: QmarTwASoQC365c6yCydYVB7524ELwJbXfHmh5qUPEEtec + connection.py: QmTNcjJSBWRrB5srBTEpjRfbvDuxJtsFcdhYJ1UYsLGqKT fingerprint_ignore_patterns: [] protocols: - fetchai/oef_search:0.3.0 diff --git a/packages/fetchai/connections/oef/connection.py b/packages/fetchai/connections/oef/connection.py index aca167c6c9..ecba75c4e5 100644 --- a/packages/fetchai/connections/oef/connection.py +++ b/packages/fetchai/connections/oef/connection.py @@ -139,6 +139,8 @@ def __init__( self._threaded_pool = ThreadPoolExecutor(self.THREAD_POOL_SIZE) + self.aea_logger = logger + async def _run_in_executor(self, fn, *args): return await self._loop.run_in_executor(self._threaded_pool, fn, *args) @@ -193,7 +195,7 @@ def on_cfp( :return: None """ self._check_loop_and_queue() - logger.warning( + self.aea_logger.warning( "Dropping incompatible on_cfp: msg_id={}, dialogue_id={}, origin={}, target={}, query={}".format( msg_id, dialogue_id, origin, target, query ) @@ -218,7 +220,7 @@ def on_propose( :return: None """ self._check_loop_and_queue() - logger.warning( + self.aea_logger.warning( "Dropping incompatible on_propose: msg_id={}, dialogue_id={}, origin={}, target={}".format( msg_id, dialogue_id, origin, target ) @@ -237,7 +239,7 @@ def on_accept( :return: None """ self._check_loop_and_queue() - logger.warning( + self.aea_logger.warning( "Dropping incompatible on_accept: msg_id={}, dialogue_id={}, origin={}, target={}".format( msg_id, dialogue_id, origin, target ) @@ -256,7 +258,7 @@ def on_decline( :return: None """ self._check_loop_and_queue() - logger.warning( + self.aea_logger.warning( "Dropping incompatible on_decline: msg_id={}, dialogue_id={}, origin={}, target={}".format( msg_id, dialogue_id, origin, target ) @@ -273,13 +275,13 @@ def on_search_result(self, search_id: int, agents: List[Address]) -> None: self._check_loop_and_queue() oef_search_dialogue = self.oef_msg_id_to_dialogue.pop(search_id, None) if oef_search_dialogue is None: - logger.warning( + self.aea_logger.warning( "Could not find dialogue for search_id={}".format(search_id) ) # pragma: nocover return # pragma: nocover last_msg = oef_search_dialogue.last_incoming_message if last_msg is None: - logger.warning("Could not find last message.") # pragma: nocover + self.aea_logger.warning("Could not find last message.") # pragma: nocover return # pragma: nocover msg = OefSearchMessage( performative=OefSearchMessage.Performative.SEARCH_RESULT, @@ -315,13 +317,13 @@ def on_oef_error( operation = OefSearchMessage.OefErrorOperation.OTHER oef_search_dialogue = self.oef_msg_id_to_dialogue.pop(answer_id, None) if oef_search_dialogue is None: - logger.warning( + self.aea_logger.warning( "Could not find dialogue for answer_id={}".format(answer_id) ) # pragma: nocover return # pragma: nocover last_msg = oef_search_dialogue.last_incoming_message if last_msg is None: - logger.warning("Could not find last message.") # pragma: nocover + self.aea_logger.warning("Could not find last message.") # pragma: nocover return # pragma: nocover msg = OefSearchMessage( performative=OefSearchMessage.Performative.OEF_ERROR, @@ -378,7 +380,7 @@ def send(self, envelope: Envelope) -> None: """ if self.excluded_protocols is not None: # pragma: nocover if envelope.protocol_id in self.excluded_protocols: - logger.error( + self.aea_logger.error( "This envelope cannot be sent with the oef connection: protocol_id={}".format( envelope.protocol_id ) @@ -412,7 +414,7 @@ def send_oef_message(self, envelope: Envelope) -> None: OefSearchDialogue, self.oef_search_dialogues.update(oef_message) ) if oef_search_dialogue is None: - logger.warning( + self.aea_logger.warning( "Could not create dialogue for message={}".format(oef_message) ) # pragma: nocover return # pragma: nocover @@ -443,7 +445,7 @@ def handle_failure( # pylint: disable=no-self-use self, exception: Exception, conn ) -> None: """Handle failure.""" - logger.exception(exception) # pragma: nocover + self.aea_logger.exception(exception) # pragma: nocover async def _set_loop_and_queue(self): self._loop = asyncio.get_event_loop() @@ -474,7 +476,9 @@ async def connect(self) -> None: # pylint: disable=invalid-overridden-method ) if is_connected: return - logger.warning("Cannot connect to OEFChannel. Retrying in 5 seconds...") + self.aea_logger.warning( + "Cannot connect to OEFChannel. Retrying in 5 seconds..." + ) await asyncio.sleep(self.CONNECT_RETRY_DELAY) raise ValueError("Connect attempts limit!") # pragma: nocover @@ -538,6 +542,7 @@ async def connect(self) -> None: if self.connection_status.is_connected: return try: + self.channel.aea_logger = self.logger self.connection_status.is_connecting = True self._loop = asyncio.get_event_loop() await self.channel.connect() @@ -563,12 +568,14 @@ async def _connection_check(self) -> None: if not self.channel.get_state() == "connected": # pragma: no cover self.connection_status.is_connected = False self.connection_status.is_connecting = True - logger.warning( + self.logger.warning( "Lost connection to OEFChannel. Retrying to connect soon ..." ) await self.channel.connect() self.connection_status.is_connected = True - logger.warning("Successfully re-established connection to OEFChannel.") + self.logger.warning( + "Successfully re-established connection to OEFChannel." + ) async def disconnect(self) -> None: """ @@ -595,15 +602,15 @@ async def receive(self, *args, **kwargs) -> Optional["Envelope"]: try: envelope = await self.channel.get() if envelope is None: - logger.debug("Received None.") + self.logger.debug("Received None.") return None - logger.debug("Received envelope: {}".format(envelope)) + self.logger.debug("Received envelope: {}".format(envelope)) return envelope except CancelledError: - logger.debug("Receive cancelled.") + self.logger.debug("Receive cancelled.") return None except Exception as e: # pragma: nocover # pylint: disable=broad-except - logger.exception(e) + self.logger.exception(e) return None async def send(self, envelope: "Envelope") -> None: diff --git a/packages/fetchai/connections/oef/connection.yaml b/packages/fetchai/connections/oef/connection.yaml index bfcad68935..9e08dbe7cf 100644 --- a/packages/fetchai/connections/oef/connection.yaml +++ b/packages/fetchai/connections/oef/connection.yaml @@ -7,7 +7,7 @@ license: Apache-2.0 aea_version: '>=0.5.0, <0.6.0' fingerprint: __init__.py: QmUAen8tmoBHuCerjA3FSGKJRLG6JYyUS3chuWzPxKYzez - connection.py: QmQyAhHBp62xyzxuA5FJBYXgnLQnjH5gpmit2bzva9Uvu9 + connection.py: QmXutRqmffjc9xL6F8bGQ9dBPkZUP6GRZUtxsKzmdmd8G6 object_translator.py: QmNYd7ikc3nYZMCXjyfen2nENHpNCZws44MNEDbzAsHrGu fingerprint_ignore_patterns: [] protocols: diff --git a/packages/fetchai/connections/p2p_client/connection.py b/packages/fetchai/connections/p2p_client/connection.py index fd9e53c2bc..ac7f817749 100644 --- a/packages/fetchai/connections/p2p_client/connection.py +++ b/packages/fetchai/connections/p2p_client/connection.py @@ -35,7 +35,7 @@ logger = logging.getLogger("aea.packages.fetchai.connections.p2p_client") -PUBLIC_ID = PublicId.from_str("fetchai/p2p_client:0.2.0") +PUBLIC_ID = PublicId.from_str("fetchai/p2p_client:0.3.0") class PeerToPeerChannel: @@ -63,7 +63,8 @@ def __init__( self.thread = Thread(target=self.receiving_loop) self.lock = threading.Lock() self.stopped = True - logger.info("Initialised the peer to peer channel") + self.logger = logger + self.logger.info("Initialised the peer to peer channel") def connect(self): """ @@ -78,18 +79,18 @@ def connect(self): ) self.stopped = False self.thread.start() - logger.debug("P2P Channel is connected.") + self.logger.debug("P2P Channel is connected.") self.try_register() def try_register(self) -> bool: """Try to register to the provider.""" try: assert self._httpCall is not None - logger.info(self.address) + self.logger.info(self.address) query = self._httpCall.register(sender_address=self.address, mailbox=True) return query["status"] == "OK" except Exception: # pragma: no cover - logger.warning("Could not register to the provider.") + self.logger.warning("Could not register to the provider.") raise AEAConnectionError() def send(self, envelope: Envelope) -> None: @@ -103,7 +104,7 @@ def send(self, envelope: Envelope) -> None: if self.excluded_protocols is not None: if envelope.protocol_id in self.excluded_protocols: # pragma: nocover - logger.error( + self.logger.error( "This envelope cannot be sent with the oef connection: protocol_id={}".format( envelope.protocol_id ) @@ -128,7 +129,7 @@ def receiving_loop(self) -> None: sender_address=self.address ) # type: List[Dict[str, Any]] for message in messages: - logger.debug("Received message: {}".format(message)) + self.logger.debug("Received message: {}".format(message)) envelope = Envelope( to=message["TO"]["RECEIVER_ADDRESS"], sender=message["FROM"]["SENDER_ADDRESS"], @@ -137,7 +138,7 @@ def receiving_loop(self) -> None: ) self.loop.call_soon_threadsafe(self.in_queue.put_nowait, envelope) time.sleep(0.5) - logger.debug("Receiving loop stopped.") + self.logger.debug("Receiving loop stopped.") def disconnect(self) -> None: """ @@ -174,6 +175,7 @@ async def connect(self) -> None: :return: None """ if not self.connection_status.is_connected: + self.channel.logger = self.logger self.connection_status.is_connected = True self.channel.in_queue = asyncio.Queue() self.channel.loop = self.loop diff --git a/packages/fetchai/connections/p2p_client/connection.yaml b/packages/fetchai/connections/p2p_client/connection.yaml index 57cb12100c..69363f811b 100644 --- a/packages/fetchai/connections/p2p_client/connection.yaml +++ b/packages/fetchai/connections/p2p_client/connection.yaml @@ -1,13 +1,13 @@ name: p2p_client author: fetchai -version: 0.2.0 +version: 0.3.0 description: The p2p_client connection provides a connection with the fetch.ai mail provider. license: Apache-2.0 aea_version: '>=0.5.0, <0.6.0' fingerprint: __init__.py: QmdwnPo8iC2uqf9CmB4ocbh6HP2jcgCtuFdS4djuajp6Li - connection.py: QmUbUbv9xVM9r9GaND4KNgFPzQwnujEUcTEZWvsAiTvzGY + connection.py: QmcCcazqG4BXymHdHbmgvUwCkYzLG8UvduRb258opxSupP fingerprint_ignore_patterns: [] protocols: [] class_name: PeerToPeerConnection diff --git a/packages/fetchai/connections/p2p_libp2p/connection.py b/packages/fetchai/connections/p2p_libp2p/connection.py index 7923621cc0..e53c94d35c 100644 --- a/packages/fetchai/connections/p2p_libp2p/connection.py +++ b/packages/fetchai/connections/p2p_libp2p/connection.py @@ -282,6 +282,8 @@ def __init__( self._reader_protocol = None # type: Optional[asyncio.StreamReaderProtocol] self._fileobj = None # type: Optional[IO[str]] + self.logger = logger + @property def reader_protocol(self) -> asyncio.StreamReaderProtocol: """Get reader protocol.""" @@ -305,7 +307,7 @@ async def start(self) -> None: logger.info("Downloading golang dependencies. This may take a while...") returncode = await _golang_module_build_async(self.source, self._log_file_desc) with open(self.log_file, "r") as f: - logger.debug(f.read()) + self.logger.debug(f.read()) node_log = "" with open(self.log_file, "r") as f: node_log = f.read() @@ -315,12 +317,12 @@ async def start(self) -> None: returncode, node_log ) ) - logger.info("Finished downloading golang dependencies.") + self.logger.info("Finished downloading golang dependencies.") # setup fifos in_path = self.libp2p_to_aea_path out_path = self.aea_to_libp2p_path - logger.debug("Creating pipes ({}, {})...".format(in_path, out_path)) + self.logger.debug("Creating pipes ({}, {})...".format(in_path, out_path)) if os.path.exists(in_path): os.remove(in_path) # pragma: no cover if os.path.exists(out_path): @@ -361,12 +363,12 @@ async def start(self) -> None: ) # run node - logger.info("Starting libp2p node...") + self.logger.info("Starting libp2p node...") self.proc = _golang_module_run( self.source, LIBP2P_NODE_MODULE_NAME, [self.env_file], self._log_file_desc ) - logger.info("Connecting to libp2p node...") + self.logger.info("Connecting to libp2p node...") await self._connect() async def _connect(self) -> None: @@ -377,13 +379,13 @@ async def _connect(self) -> None: """ if self._connection_attempts == 1: with open(self.log_file, "r") as f: - logger.debug("Couldn't connect to libp2p p2p process, logs:") - logger.debug(f.read()) + self.logger.debug("Couldn't connect to libp2p p2p process, logs:") + self.logger.debug(f.read()) raise Exception("Couldn't connect to libp2p p2p process") # TOFIX(LR) use proper exception self._connection_attempts -= 1 - logger.debug( + self.logger.debug( "Attempt opening pipes {}, {}...".format( self.libp2p_to_aea_path, self.aea_to_libp2p_path ) @@ -419,9 +421,9 @@ async def _connect(self) -> None: self._fileobj = os.fdopen(self._libp2p_to_aea, "r") await self._loop.connect_read_pipe(lambda: self.reader_protocol, self._fileobj) - logger.info("Successfully connected to libp2p node!") + self.logger.info("Successfully connected to libp2p node!") self.multiaddrs = self.get_libp2p_node_multiaddrs() - logger.info("My libp2p addresses: {}".format(self.multiaddrs)) + self.logger.info("My libp2p addresses: {}".format(self.multiaddrs)) @asyncio.coroutine def write(self, data: bytes) -> None: @@ -445,7 +447,7 @@ async def read(self) -> Optional[bytes]: self._stream_reader is not None ), "StreamReader not set, call connect first!" try: - logger.debug("Waiting for messages...") + self.logger.debug("Waiting for messages...") buf = await self._stream_reader.readexactly(4) if not buf: # pragma: no cover return None @@ -455,7 +457,7 @@ async def read(self) -> Optional[bytes]: return None return data except asyncio.streams.IncompleteReadError as e: - logger.info( + self.logger.info( "Connection disconnected while reading from node ({}/{})".format( len(e.partial), e.expected ) @@ -500,14 +502,14 @@ def stop(self) -> None: """ # TOFIX(LR) wait is blocking and proc can ignore terminate if self.proc is not None: - logger.debug("Terminating node process {}...".format(self.proc.pid)) + self.logger.debug("Terminating node process {}...".format(self.proc.pid)) self.proc.terminate() - logger.debug( + self.logger.debug( "Waiting for node process {} to terminate...".format(self.proc.pid) ) self.proc.wait() else: - logger.debug("Called stop when process not set!") # pragma: no cover + self.logger.debug("Called stop when process not set!") # pragma: no cover if os.path.exists(LIBP2P_NODE_ENV_FILE): os.remove(LIBP2P_NODE_ENV_FILE) @@ -627,6 +629,7 @@ async def connect(self) -> None: try: # start libp2p node self.connection_status.is_connecting = True + self.node.logger = self.logger await self.node.start() self.connection_status.is_connecting = False self.connection_status.is_connected = True @@ -660,7 +663,7 @@ async def disconnect(self) -> None: if self._in_queue is not None: self._in_queue.put_nowait(None) else: - logger.debug("Called disconnect when input queue not initialized.") + self.logger.debug("Called disconnect when input queue not initialized.") async def receive(self, *args, **kwargs) -> Optional["Envelope"]: """ @@ -672,18 +675,18 @@ async def receive(self, *args, **kwargs) -> Optional["Envelope"]: assert self._in_queue is not None, "Input queue not initialized." data = await self._in_queue.get() if data is None: - logger.debug("Received None.") + self.logger.debug("Received None.") self.node.stop() self.connection_status.is_connected = False return None # TOFIX(LR) attempt restarting the node? - logger.debug("Received data: {}".format(data)) + self.logger.debug("Received data: {}".format(data)) return Envelope.decode(data) except CancelledError: # pragma: no cover - logger.debug("Receive cancelled.") + self.logger.debug("Receive cancelled.") return None except Exception as e: # pragma: nocover # pylint: disable=broad-except - logger.exception(e) + self.logger.exception(e) return None async def send(self, envelope: Envelope): diff --git a/packages/fetchai/connections/p2p_libp2p/connection.yaml b/packages/fetchai/connections/p2p_libp2p/connection.yaml index 8372f510d4..4d4d0edede 100644 --- a/packages/fetchai/connections/p2p_libp2p/connection.yaml +++ b/packages/fetchai/connections/p2p_libp2p/connection.yaml @@ -11,7 +11,7 @@ fingerprint: aea/api.go: QmW5fUpVZmV3pxgoakm3RvsvCGC6FwT2XprcqXHM8rBXP5 aea/envelope.pb.go: QmRfUNGpCeVJfsW3H1MzCN4pwDWgumfyWufVFp6xvUjjug aea/envelope.proto: QmSC8EGCKiNFR2vf5bSWymSzYDFMipQW9aQVMwPzQoKb4n - connection.py: QmaGCXNSyuW5MP428XB1FxUt6sAYdsNrh13UWLDFeBoP7b + connection.py: QmeVHd5imDjEJB7qqcdudurbacoDkZphC5hetAaJkhFCkk dht/dhtclient/dhtclient.go: QmNnU1pVCUtj8zJ1Pz5eMk9sznsjPFSJ9qDkzbrNwzEecV dht/dhtclient/dhtclient_test.go: QmPfnHSHXtbaW5VYuq1QsKQWey64pUEvLEaKKkT9eAcmws dht/dhtclient/options.go: QmPorj38wNrxGrzsbFe5wwLmiHzxbTJ2VsgvSd8tLDYS8s diff --git a/packages/fetchai/connections/p2p_libp2p_client/connection.py b/packages/fetchai/connections/p2p_libp2p_client/connection.py index dae9a2f474..0b96d8fe46 100644 --- a/packages/fetchai/connections/p2p_libp2p_client/connection.py +++ b/packages/fetchai/connections/p2p_libp2p_client/connection.py @@ -34,7 +34,7 @@ logger = logging.getLogger("aea.packages.fetchai.connections.p2p_libp2p_client") -PUBLIC_ID = PublicId.from_str("fetchai/p2p_libp2p_client:0.2.0") +PUBLIC_ID = PublicId.from_str("fetchai/p2p_libp2p_client:0.3.0") class Uri: @@ -165,7 +165,7 @@ async def connect(self) -> None: self.connection_status.is_connecting = False self.connection_status.is_connected = True - logger.info( + self.logger.info( "Successfully connected to libp2p node {}".format(str(self.node_uri)) ) @@ -202,7 +202,7 @@ async def disconnect(self) -> None: # TOFIX(LR) mypy issue https://github.com/python/mypy/issues/8546 # self._process_messages_task = None - logger.debug("disconnecting libp2p client connection...") + self.logger.debug("disconnecting libp2p client connection...") self._writer.write_eof() await self._writer.drain() self._writer.close() @@ -212,7 +212,7 @@ async def disconnect(self) -> None: if self._in_queue is not None: self._in_queue.put_nowait(None) else: # pragma: no cover - logger.debug("Called disconnect when input queue not initialized.") + self.logger.debug("Called disconnect when input queue not initialized.") async def receive(self, *args, **kwargs) -> Optional["Envelope"]: """ @@ -224,7 +224,7 @@ async def receive(self, *args, **kwargs) -> Optional["Envelope"]: assert self._in_queue is not None, "Input queue not initialized." data = await self._in_queue.get() if data is None: - logger.debug("Received None.") + self.logger.debug("Received None.") if ( self._connection_status.is_connected or self._connection_status.is_connecting @@ -232,13 +232,13 @@ async def receive(self, *args, **kwargs) -> Optional["Envelope"]: await self.disconnect() return None # TOFIX(LR) attempt restarting the node? - logger.debug("Received data: {}".format(data)) + self.logger.debug("Received data: {}".format(data)) return Envelope.decode(data) except CancelledError: # pragma: no cover - logger.debug("Receive cancelled.") + self.logger.debug("Receive cancelled.") return None except Exception as e: # pragma: no cover # pylint: disable=broad-except - logger.exception(e) + self.logger.exception(e) return None async def send(self, envelope: Envelope): @@ -272,7 +272,7 @@ async def _send(self, data: bytes) -> None: async def _receive(self) -> Optional[bytes]: assert self._reader is not None try: - logger.debug("Waiting for messages...") + self.logger.debug("Waiting for messages...") buf = await self._reader.readexactly(4) if not buf: # pragma: no cover return None @@ -282,7 +282,7 @@ async def _receive(self) -> Optional[bytes]: return None return data except asyncio.streams.IncompleteReadError as e: - logger.info( + self.logger.info( "Connection disconnected while reading from node ({}/{})".format( len(e.partial), e.expected ) diff --git a/packages/fetchai/connections/p2p_libp2p_client/connection.yaml b/packages/fetchai/connections/p2p_libp2p_client/connection.yaml index e392759bbd..da8ab7082b 100644 --- a/packages/fetchai/connections/p2p_libp2p_client/connection.yaml +++ b/packages/fetchai/connections/p2p_libp2p_client/connection.yaml @@ -1,6 +1,6 @@ name: p2p_libp2p_client author: fetchai -version: 0.2.0 +version: 0.3.0 description: The libp2p client connection implements a tcp connection to a running libp2p node as a traffic delegate to send/receive envelopes to/from agents in the DHT. @@ -8,7 +8,7 @@ license: Apache-2.0 aea_version: '>=0.5.0, <0.6.0' fingerprint: __init__.py: QmT1FEHkPGMHV5oiVEfQHHr25N2qdZxydSNRJabJvYiTgf - connection.py: QmZfEw3G2LXEivmGu9UodJwhptcRCz3BYkRGfepuRfhGWU + connection.py: QmS74oAqsT2XPyLsvz8oFWD9bigy3wLwqJEAZ169raGXvz fingerprint_ignore_patterns: [] protocols: [] class_name: P2PLibp2pClientConnection diff --git a/packages/fetchai/connections/p2p_stub/connection.py b/packages/fetchai/connections/p2p_stub/connection.py index 4f3ec6fb2d..0b51fd1500 100644 --- a/packages/fetchai/connections/p2p_stub/connection.py +++ b/packages/fetchai/connections/p2p_stub/connection.py @@ -18,7 +18,6 @@ # ------------------------------------------------------------------------------ """This module contains the p2p stub connection.""" -import logging import os import tempfile from pathlib import Path @@ -29,9 +28,7 @@ from aea.identity.base import Identity from aea.mail.base import Envelope -logger = logging.getLogger(__name__) - -PUBLIC_ID = PublicId.from_str("fetchai/p2p_stub:0.3.0") +PUBLIC_ID = PublicId.from_str("fetchai/p2p_stub:0.4.0") class P2PStubConnection(StubConnection): diff --git a/packages/fetchai/connections/p2p_stub/connection.yaml b/packages/fetchai/connections/p2p_stub/connection.yaml index ea8b188fb7..eb5ec4d729 100644 --- a/packages/fetchai/connections/p2p_stub/connection.yaml +++ b/packages/fetchai/connections/p2p_stub/connection.yaml @@ -1,13 +1,13 @@ name: p2p_stub author: fetchai -version: 0.3.0 +version: 0.4.0 description: The stub p2p connection implements a local p2p connection allowing agents to communicate with each other through files created in the namespace directory. license: Apache-2.0 aea_version: '>=0.5.0, <0.6.0' fingerprint: __init__.py: QmW9XFKGsea4u3fupkFMcQutgsjqusCMBMyTcTmLLmQ4tR - connection.py: QmepHudxTZ77p9DDNrzdW27cU3t4nNM18SzAxH9cD8pRxY + connection.py: QmbGLdt5T3aV69HDch74DXv7an5N3nJJnxWQqgfVuHpXif fingerprint_ignore_patterns: [] protocols: [] class_name: P2PStubConnection diff --git a/packages/fetchai/connections/soef/connection.py b/packages/fetchai/connections/soef/connection.py index 1cf765e4b3..8b673ff5a8 100644 --- a/packages/fetchai/connections/soef/connection.py +++ b/packages/fetchai/connections/soef/connection.py @@ -847,15 +847,15 @@ async def receive(self, *args, **kwargs) -> Optional["Envelope"]: assert self.in_queue is not None envelope = await self.in_queue.get() if envelope is None: # pragma: nocover - logger.debug("Received None.") + self.logger.debug("Received None.") return None - logger.debug("Received envelope: {}".format(envelope)) + self.logger.debug("Received envelope: {}".format(envelope)) return envelope except CancelledError: - logger.debug("Receive cancelled.") + self.logger.debug("Receive cancelled.") return None except Exception as e: # pragma: nocover # pylint: disable=broad-except - logger.exception(e) + self.logger.exception(e) return None async def send(self, envelope: "Envelope") -> None: diff --git a/packages/fetchai/connections/soef/connection.yaml b/packages/fetchai/connections/soef/connection.yaml index 3da41faee4..1e4b885e16 100644 --- a/packages/fetchai/connections/soef/connection.yaml +++ b/packages/fetchai/connections/soef/connection.yaml @@ -6,7 +6,7 @@ license: Apache-2.0 aea_version: '>=0.5.0, <0.6.0' fingerprint: __init__.py: Qmd5VBGFJHXFe1H45XoUh5mMSYBwvLSViJuGFeMgbPdQts - connection.py: QmRiPQZMAMxzo5XaMFYmq6aewgHCqsieoj4m9hVBNJQgC8 + connection.py: QmexZbwTjVgR2E9Bk1FSjCduRSJTTpPQKLkC9hqiQ7U3B1 fingerprint_ignore_patterns: [] protocols: - fetchai/oef_search:0.3.0 diff --git a/packages/fetchai/connections/tcp/base.py b/packages/fetchai/connections/tcp/base.py index 9f207ea696..cb898f8e68 100644 --- a/packages/fetchai/connections/tcp/base.py +++ b/packages/fetchai/connections/tcp/base.py @@ -76,14 +76,14 @@ async def connect(self): :raises ConnectionError: if a problem occurred during the connection. """ if self.connection_status.is_connected: - logger.warning("Connection already set up.") + self.logger.warning("Connection already set up.") return try: await self.setup() self.connection_status.is_connected = True except Exception as e: # pragma: nocover # pylint: disable=broad-except - logger.error(str(e)) + self.logger.error(str(e)) self.connection_status.is_connected = False async def disconnect(self) -> None: @@ -93,7 +93,7 @@ async def disconnect(self) -> None: :return: None. """ if not self.connection_status.is_connected: - logger.warning("Connection already disconnected.") + self.logger.warning("Connection already disconnected.") return await self.teardown() @@ -113,9 +113,9 @@ async def _recv(self, reader: StreamReader) -> Optional[bytes]: return data async def _send(self, writer, data): - logger.debug("[{}] Send a message".format(self.address)) + self.logger.debug("[{}] Send a message".format(self.address)) nbytes = struct.pack("I", len(data)) - logger.debug("#bytes: {!r}".format(nbytes)) + self.logger.debug("#bytes: {!r}".format(nbytes)) try: writer.write(nbytes) writer.write(data) @@ -135,4 +135,6 @@ async def send(self, envelope: Envelope) -> None: data = envelope.encode() await self._send(writer, data) else: - logger.error("[{}]: Cannot send envelope {}".format(self.address, envelope)) + self.logger.error( + "[{}]: Cannot send envelope {}".format(self.address, envelope) + ) diff --git a/packages/fetchai/connections/tcp/connection.yaml b/packages/fetchai/connections/tcp/connection.yaml index 0ba2a4a52d..1882708eaa 100644 --- a/packages/fetchai/connections/tcp/connection.yaml +++ b/packages/fetchai/connections/tcp/connection.yaml @@ -6,10 +6,10 @@ license: Apache-2.0 aea_version: '>=0.5.0, <0.6.0' fingerprint: __init__.py: QmTxAtQ9ffraStxxLAkvmWxyGhoV3jE16Sw6SJ9xzTthLb - base.py: QmeStmJxHnFYuuBqx6hx9NgrW6BTNCmRgEeGewnm8jvQyc + base.py: QmaVguH7JgvqdBMMZMf1xwH7m736KhnCpATwFQ7EZiX4JE connection.py: QmTFkiw3JLmhEM6CKRpKjv9Y32nuCQevZ2gVKoQ4gExeW9 - tcp_client.py: QmeBe8E9zofdzochVRJLg6m5CmNprnCSTmW3NqUYp49pEL - tcp_server.py: QmY7TRJnBiut6BJqpgYuwQvjHRG3xLePjxKDw7ffzr16Vc + tcp_client.py: QmTXs6z3rvxB59FmGuu46CeY1eHRPBNQ4CPZm1y7hRpusp + tcp_server.py: QmPP89GpcQpc8Ptfdgf74tgTuqWmLDRXQ6vA8rwWDtiiwh fingerprint_ignore_patterns: [] protocols: [] class_name: TCPClientConnection diff --git a/packages/fetchai/connections/tcp/tcp_client.py b/packages/fetchai/connections/tcp/tcp_client.py index 4ce343f1b8..906c50bb0c 100644 --- a/packages/fetchai/connections/tcp/tcp_client.py +++ b/packages/fetchai/connections/tcp/tcp_client.py @@ -34,7 +34,7 @@ from packages.fetchai.connections.tcp.base import TCPConnection -logger = logging.getLogger("aea.packages.fetchai.connections.tcp_client") +logger = logging.getLogger("aea.packages.fetchai.connections.tcp.tcp_client") STUB_DIALOGUE_ID = 0 @@ -86,20 +86,22 @@ async def receive(self, *args, **kwargs) -> Optional["Envelope"]: assert self._reader is not None data = await self._recv(self._reader) if data is None: # pragma: nocover - logger.debug("[{}] No data received.".format(self.address)) + self.logger.debug("[{}] No data received.".format(self.address)) return None - logger.debug("[{}] Message received: {!r}".format(self.address, data)) + self.logger.debug("[{}] Message received: {!r}".format(self.address, data)) envelope = Envelope.decode(data) # TODO handle decoding error - logger.debug("[{}] Decoded envelope: {}".format(self.address, envelope)) + self.logger.debug( + "[{}] Decoded envelope: {}".format(self.address, envelope) + ) return envelope except CancelledError: - logger.debug("[{}] Read cancelled.".format(self.address)) + self.logger.debug("[{}] Read cancelled.".format(self.address)) return None except struct.error as e: - logger.debug("Struct error: {}".format(str(e))) + self.logger.debug("Struct error: {}".format(str(e))) return None except Exception as e: - logger.exception(e) + self.logger.exception(e) raise def select_writer_from_envelope(self, envelope: Envelope) -> Optional[StreamWriter]: diff --git a/packages/fetchai/connections/tcp/tcp_server.py b/packages/fetchai/connections/tcp/tcp_server.py index 598380fdb6..f8ba05a575 100644 --- a/packages/fetchai/connections/tcp/tcp_server.py +++ b/packages/fetchai/connections/tcp/tcp_server.py @@ -60,12 +60,12 @@ async def handle(self, reader: StreamReader, writer: StreamWriter) -> None: :param writer: the stream writer. :return: None """ - logger.debug("Waiting for client address...") + self.logger.debug("Waiting for client address...") address_bytes = await self._recv(reader) if address_bytes is not None: address_bytes = cast(bytes, address_bytes) address = address_bytes.decode("utf-8") - logger.debug("Public key of the client: {}".format(address)) + self.logger.debug("Public key of the client: {}".format(address)) self.connections[address] = (reader, writer) read_task = asyncio.ensure_future(self._recv(reader), loop=self._loop) self._read_tasks_to_address[read_task] = address @@ -77,20 +77,20 @@ async def receive(self, *args, **kwargs) -> Optional["Envelope"]: :return: the received envelope, or None if an error occurred. """ if len(self._read_tasks_to_address) == 0: - logger.warning( + self.logger.warning( "Tried to read from the TCP server. However, there is no open connection to read from." ) return None try: - logger.debug("Waiting for incoming messages...") + self.logger.debug("Waiting for incoming messages...") done, _ = await asyncio.wait(self._read_tasks_to_address.keys(), return_when=asyncio.FIRST_COMPLETED) # type: ignore # take the first task = next(iter(done)) envelope_bytes = task.result() if envelope_bytes is None: # pragma: no cover - logger.debug("[{}]: No data received.") + self.logger.debug("[{}]: No data received.") return None envelope = Envelope.decode(envelope_bytes) address = self._read_tasks_to_address.pop(task) @@ -99,10 +99,10 @@ async def receive(self, *args, **kwargs) -> Optional["Envelope"]: self._read_tasks_to_address[new_task] = address return envelope except asyncio.CancelledError: - logger.debug("Receiving loop cancelled.") + self.logger.debug("Receiving loop cancelled.") return None except Exception as e: # pragma: nocover # pylint: disable=broad-except - logger.error("Error in the receiving loop: {}".format(str(e))) + self.logger.error("Error in the receiving loop: {}".format(str(e))) return None async def setup(self): @@ -110,7 +110,7 @@ async def setup(self): self._server = await asyncio.start_server( self.handle, host=self.host, port=self.port ) - logger.debug("Start listening on {}:{}".format(self.host, self.port)) + self.logger.debug("Start listening on {}:{}".format(self.host, self.port)) async def teardown(self): """Tear the connection down.""" diff --git a/packages/fetchai/connections/webhook/connection.py b/packages/fetchai/connections/webhook/connection.py index ca759ec54a..e646187c68 100644 --- a/packages/fetchai/connections/webhook/connection.py +++ b/packages/fetchai/connections/webhook/connection.py @@ -37,7 +37,7 @@ NOT_FOUND = 404 REQUEST_TIMEOUT = 408 SERVER_ERROR = 500 -PUBLIC_ID = PublicId.from_str("fetchai/webhook:0.3.0") +PUBLIC_ID = PublicId.from_str("fetchai/webhook:0.4.0") logger = logging.getLogger("aea.packages.fetchai.connections.webhook") @@ -78,7 +78,8 @@ def __init__( self.connection_id = connection_id self.in_queue = None # type: Optional[asyncio.Queue] # pragma: no cover - logger.info("Initialised a webhook channel") + self.logger = logger + self.logger.info("Initialised a webhook channel") async def connect(self) -> None: """ @@ -211,6 +212,7 @@ async def connect(self) -> None: """ if not self.connection_status.is_connected: self.connection_status.is_connected = True + self.channel.logger = self.logger self.channel.in_queue = asyncio.Queue() await self.channel.connect() diff --git a/packages/fetchai/connections/webhook/connection.yaml b/packages/fetchai/connections/webhook/connection.yaml index fb696dccd9..62245d007a 100644 --- a/packages/fetchai/connections/webhook/connection.yaml +++ b/packages/fetchai/connections/webhook/connection.yaml @@ -1,12 +1,12 @@ name: webhook author: fetchai -version: 0.3.0 +version: 0.4.0 description: The webhook connection that wraps a webhook functionality. license: Apache-2.0 aea_version: '>=0.5.0, <0.6.0' fingerprint: __init__.py: QmWUKSmXaBgGMvKgdmzKmMjCx43BnrfW6og2n3afNoAALq - connection.py: QmZuRpeuoa1sx5UTZtVsYh5RqnyreoinhTP2jXXVHzy3A6 + connection.py: QmeGqgig7Ab95znNf2kBHukAjbsaofFX24SYRaDreEwn9V fingerprint_ignore_patterns: [] protocols: - fetchai/http:0.3.0 diff --git a/packages/hashes.csv b/packages/hashes.csv index 17a52dae3a..13a7c74488 100644 --- a/packages/hashes.csv +++ b/packages/hashes.csv @@ -1,5 +1,5 @@ -fetchai/agents/aries_alice,QmZ2BMvWdC1Am9jqQrZetBPnJTVhpR2UBQGoG3cRcBC7YL -fetchai/agents/aries_faber,QmdFrjCBja3NZXrcBRvvVSJKK3aHzJRwxs2k6XRDhbXBCM +fetchai/agents/aries_alice,QmTaUncpiJipj8UrpTmR1JJqDSkWofLBC9cY1QTpKYEkEV +fetchai/agents/aries_faber,QmUTaZb1vYDbfELBUKwwRxudPMkHW2JRngqSjiBZnxfr2g fetchai/agents/car_data_buyer,QmXKuZ3dqpaT4GUPRXKJiejRepCjL7iuF1XTM99wq4a5nL fetchai/agents/car_detector,QmRFWhSLY22iRoT4CwBfLmsYX7fUi9VGS9JHckUTmxd4aQ fetchai/agents/erc1155_client,QmQtMv3FwHYt74NNemFKnS4iKo1U1Z9QvfN76Fmnccqz1q @@ -18,21 +18,21 @@ fetchai/agents/thermometer_aea,QmSfGkjDkvL3L2JvmEJJ9sdhd1xFFVL8epfzGFtDBaBd3q fetchai/agents/thermometer_client,QmbcVyNwpHAwY8NPcgd2bdDpGhLLZLTyg6o78o67RXrNC1 fetchai/agents/weather_client,QmemjFHEFE32mXjP48NEX7prqaAHfW9wTM8mBAPfM4dUeA fetchai/agents/weather_station,QmQ8vVjVB4xDqjZwd5SH2skaqXFMkkBSX69j7VXM3ru2ez -fetchai/connections/gym,QmZNEJvgi9n5poswQrHav3fvSv5vA1nbxxkTzWENCoCdrc -fetchai/connections/http_client,QmXQrA6gA4hMEMkMQsEp1MQwDEqRw5BnnqR4gCrP5xqVD2 -fetchai/connections/http_server,QmPMSyX1iaWM7mWqFtW8LnSyR9r88RzYbGtyYmopT6tshC +fetchai/connections/gym,QmXpTer28dVvxeXqsXzaBqX551QToh9w5KJC2oXcStpKJG +fetchai/connections/http_client,QmUjtATHombNqbwHRonc3pLUTfuvQJBxqGAj4K5zKT8beQ +fetchai/connections/http_server,QmXuGssPAahvRXHNmYrvtqYokgeCqavoiK7x9zmjQT8w23 fetchai/connections/ledger,QmVXceMJCioA1Hro9aJgBwrF9yLgToaVXifDz6EVo6vTXn -fetchai/connections/local,QmVcTEJxGbWbtXi2fLN5eJA6XuEAneaNd83UJPugrtb9xU -fetchai/connections/oef,QmdkQ9hUbJ8HsJD5qxSPRae9s2G9LZXFhfJabeHBVVYMJi -fetchai/connections/p2p_client,QmbwCDuAB1eq6JikqeAAqpqjVhxevGNeWCLqRD67Uvqiaz -fetchai/connections/p2p_libp2p,QmT5sbnNb4mz3EYbRJBatcGEPht8s2QWGsG98unVSfWuhJ -fetchai/connections/p2p_libp2p_client,QmY4vR6r4XqqWw25Q3bTmPcXMcaVAkAs3RJjEWyVEe81kv -fetchai/connections/p2p_stub,QmSBRr26YELdbYk9nAurw3XdQ3Myj7cVgCDZZMv7DMrsdg +fetchai/connections/local,QmZKciQTgE8LLHsgQX4F5Ecc7rNPp9BBSWQHEEe7jEMEmJ +fetchai/connections/oef,QmWcT6NA3jCsngAiEuCjLtWumGKScS6PrjngvGgLJXg9TK +fetchai/connections/p2p_client,QmPHaZFxqyP6Vu7N81Lz4ig76FGQQ2HJW7MukhvpF22XoP +fetchai/connections/p2p_libp2p,QmRAxV3HutV9d2komrFxaQjcy2AQjo9WwPA1hvEiKtZiRY +fetchai/connections/p2p_libp2p_client,QmPXFJHdeyRYo9dPuSJwixfLaJJ71scKv2pD9qJoRqRkZo +fetchai/connections/p2p_stub,QmTFcniXvpUw5hR27SN1W1iLcW8eGsMzFvzPQ4s3g3bw3H fetchai/connections/scaffold,QmTzEeEydjohZNTsAJnoGMtzTgCyzMBQCYgbTBLfqWtw5w -fetchai/connections/soef,QmQ2ipCqMNcP6hhQwspUuLQcV6RDCCfyXjNqFS8AFcsdpf +fetchai/connections/soef,QmXVgfnfGWYhbfb2qHDpqj9kkhsaqZCzLpgcEQiYMwctCS fetchai/connections/stub,QmWP6tgcttnUY86ynAseyHuuFT85edT31QPSyideVveiyj -fetchai/connections/tcp,QmaMM1h9KzpB43U38kH6uVhnHJC3GcTw6QYguD2QA5N2i9 -fetchai/connections/webhook,QmZ3vofEwRBZPvMCxLVanSnsewXTdK5nHyWiDWjzFUbTRy +fetchai/connections/tcp,QmemFigK3M5AZySQ4R8Lb6acMKhSVh1LY2Q9baMD3hU72a +fetchai/connections/webhook,QmZqPmyD36hmowzUrV4MsjXjXM6GXYJuZjKg9r1XUMeGxW fetchai/contracts/erc1155,QmPEae32YqmCmB7nAzoLokosvnu3u8ZN75xouzZEBvE5zM fetchai/contracts/scaffold,Qme97drP4cwCyPs3zV6WaLz9K7c5ZWRtSWQ25hMUmMjFgo fetchai/protocols/contract_api,QmcveAM85xPuhv2Dmo63adnhh5zgFVjPpPYQFEtKWxXvKj diff --git a/tests/conftest.py b/tests/conftest.py index 9b357b73c6..746fa7a431 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -159,7 +159,7 @@ UNKNOWN_SKILL_PUBLIC_ID = PublicId("unknown_author", "unknown_skill", "0.1.0") LOCAL_CONNECTION_PUBLIC_ID = PublicId("fetchai", "local", "0.1.0") P2P_CLIENT_CONNECTION_PUBLIC_ID = PublicId("fetchai", "p2p_client", "0.1.0") -HTTP_CLIENT_CONNECTION_PUBLIC_ID = PublicId.from_str("fetchai/http_client:0.4.0") +HTTP_CLIENT_CONNECTION_PUBLIC_ID = PublicId.from_str("fetchai/http_client:0.5.0") HTTP_PROTOCOL_PUBLIC_ID = PublicId("fetchai", "http", "0.1.0") STUB_CONNECTION_PUBLIC_ID = DEFAULT_CONNECTION DUMMY_PROTOCOL_PUBLIC_ID = PublicId("dummy_author", "dummy", "0.1.0") @@ -670,6 +670,7 @@ def _make_oef_connection(address: Address, oef_addr: str, oef_port: int): oef_connection = OEFConnection( configuration=configuration, identity=Identity("name", address), ) + oef_connection._default_logger_name = "aea.packages.fetchai.connections.oef" return oef_connection @@ -680,6 +681,9 @@ def _make_tcp_server_connection(address: str, host: str, port: int): tcp_connection = TCPServerConnection( configuration=configuration, identity=Identity("name", address), ) + tcp_connection._default_logger_name = ( + "aea.packages.fetchai.connections.tcp.tcp_server" + ) return tcp_connection @@ -690,6 +694,9 @@ def _make_tcp_client_connection(address: str, host: str, port: int): tcp_connection = TCPClientConnection( configuration=configuration, identity=Identity("name", address), ) + tcp_connection._default_logger_name = ( + "aea.packages.fetchai.connections.tcp.tcp_client" + ) return tcp_connection diff --git a/tests/data/dummy_aea/aea-config.yaml b/tests/data/dummy_aea/aea-config.yaml index 8ba94c2399..3aa76cf102 100644 --- a/tests/data/dummy_aea/aea-config.yaml +++ b/tests/data/dummy_aea/aea-config.yaml @@ -7,7 +7,7 @@ aea_version: '>=0.5.0, <0.6.0' fingerprint: {} fingerprint_ignore_patterns: [] connections: -- fetchai/local:0.3.0 +- fetchai/local:0.4.0 contracts: - fetchai/erc1155:0.6.0 protocols: @@ -16,7 +16,7 @@ protocols: skills: - dummy_author/dummy:0.1.0 - fetchai/error:0.3.0 -default_connection: fetchai/local:0.3.0 +default_connection: fetchai/local:0.4.0 default_ledger: fetchai ledger_apis: ethereum: diff --git a/tests/data/hashes.csv b/tests/data/hashes.csv index 41ef5ff216..5d24763182 100644 --- a/tests/data/hashes.csv +++ b/tests/data/hashes.csv @@ -1,4 +1,4 @@ -dummy_author/agents/dummy_aea,QmdAYhHqLnq8Z9nUhWDnSSwuf58d48DVC1ZpXyba6c4imF +dummy_author/agents/dummy_aea,QmZsWMsRQXAjZ7adp3WrM3DerKsiKeRCudz7QZPtA7LxZP dummy_author/skills/dummy_skill,Qme2ehYviSzGVKNZfS5N7A7Jayd7QJ4nn9EEnXdVrL231X fetchai/connections/dummy_connection,QmVAEYzswDE7CxEKQpz51f8GV7UVm7WE6AHZGqWj9QMMUK fetchai/contracts/dummy_contract,QmTBc9MJrKa66iRmvfHKpR1xmT6P5cGML5S5RUsW6yVwbm diff --git a/tests/test_aea.py b/tests/test_aea.py index 67e4d151f8..4bb55bc80f 100644 --- a/tests/test_aea.py +++ b/tests/test_aea.py @@ -129,10 +129,10 @@ def test_react(): builder.add_connection( Path(ROOT_DIR, "packages", "fetchai", "connections", "local") ) - local_connection_id = PublicId.from_str("fetchai/local:0.3.0") + local_connection_id = PublicId.from_str("fetchai/local:0.4.0") builder.set_default_connection(local_connection_id) builder.add_skill(Path(CUR_PATH, "data", "dummy_skill")) - agent = builder.build(connection_ids=[PublicId.from_str("fetchai/local:0.3.0")]) + agent = builder.build(connection_ids=[PublicId.from_str("fetchai/local:0.4.0")]) # This is a temporary workaround to feed the local node to the OEF Local connection # TODO remove it. local_connection = agent.resources.get_connection(local_connection_id) @@ -186,10 +186,10 @@ def test_handle(): builder.add_connection( Path(ROOT_DIR, "packages", "fetchai", "connections", "local") ) - local_connection_id = PublicId.from_str("fetchai/local:0.3.0") + local_connection_id = PublicId.from_str("fetchai/local:0.4.0") builder.set_default_connection(local_connection_id) builder.add_skill(Path(CUR_PATH, "data", "dummy_skill")) - aea = builder.build(connection_ids=[PublicId.from_str("fetchai/local:0.3.0")]) + aea = builder.build(connection_ids=[PublicId.from_str("fetchai/local:0.4.0")]) # This is a temporary workaround to feed the local node to the OEF Local connection # TODO remove it. local_connection = aea.resources.get_connection(local_connection_id) @@ -272,10 +272,10 @@ def test_initialize_aea_programmatically(): builder.add_connection( Path(ROOT_DIR, "packages", "fetchai", "connections", "local") ) - local_connection_id = PublicId.from_str("fetchai/local:0.3.0") + local_connection_id = PublicId.from_str("fetchai/local:0.4.0") builder.set_default_connection(local_connection_id) builder.add_skill(Path(CUR_PATH, "data", "dummy_skill")) - aea = builder.build(connection_ids=[PublicId.from_str("fetchai/local:0.3.0")]) + aea = builder.build(connection_ids=[PublicId.from_str("fetchai/local:0.4.0")]) local_connection = aea.resources.get_connection(local_connection_id) local_connection._local_node = node diff --git a/tests/test_cli/test_add/test_connection.py b/tests/test_cli/test_add/test_connection.py index 8e762be4cf..49e26a00fa 100644 --- a/tests/test_cli/test_add/test_connection.py +++ b/tests/test_cli/test_add/test_connection.py @@ -60,7 +60,7 @@ def setup_class(cls): cls.connection_name = "http_client" cls.connection_author = "fetchai" cls.connection_version = "0.3.0" - cls.connection_id = "fetchai/http_client:0.4.0" + cls.connection_id = "fetchai/http_client:0.5.0" # copy the 'packages' directory in the parent of the agent folder. shutil.copytree(Path(CUR_PATH, "..", "packages"), Path(cls.t, "packages")) @@ -151,7 +151,7 @@ def setup_class(cls): cls.connection_name = "http_client" cls.connection_author = "fetchai" cls.connection_version = "0.3.0" - cls.connection_id = "fetchai/http_client:0.4.0" + cls.connection_id = "fetchai/http_client:0.5.0" # copy the 'packages' directory in the parent of the agent folder. shutil.copytree(Path(CUR_PATH, "..", "packages"), Path(cls.t, "packages")) @@ -348,7 +348,7 @@ def setup_class(cls): cls.agent_name = "myagent" cls.cwd = os.getcwd() cls.t = tempfile.mkdtemp() - cls.connection_id = "fetchai/http_client:0.4.0" + cls.connection_id = "fetchai/http_client:0.5.0" cls.connection_name = "http_client" # copy the 'packages' directory in the parent of the agent folder. @@ -416,7 +416,7 @@ def setup_class(cls): cls.agent_name = "myagent" cls.cwd = os.getcwd() cls.t = tempfile.mkdtemp() - cls.connection_id = "fetchai/http_client:0.4.0" + cls.connection_id = "fetchai/http_client:0.5.0" cls.connection_name = "http_client" # copy the 'packages' directory in the parent of the agent folder. diff --git a/tests/test_cli/test_remove/test_connection.py b/tests/test_cli/test_remove/test_connection.py index 6615479dfd..b7435a4a7b 100644 --- a/tests/test_cli/test_remove/test_connection.py +++ b/tests/test_cli/test_remove/test_connection.py @@ -49,7 +49,7 @@ def setup_class(cls): cls.t = tempfile.mkdtemp() # copy the 'packages' directory in the parent of the agent folder. shutil.copytree(Path(CUR_PATH, "..", "packages"), Path(cls.t, "packages")) - cls.connection_id = "fetchai/http_client:0.4.0" + cls.connection_id = "fetchai/http_client:0.5.0" cls.connection_name = "http_client" os.chdir(cls.t) @@ -111,7 +111,7 @@ def setup_class(cls): cls.agent_name = "myagent" cls.cwd = os.getcwd() cls.t = tempfile.mkdtemp() - cls.connection_id = "fetchai/local:0.3.0" + cls.connection_id = "fetchai/local:0.4.0" os.chdir(cls.t) result = cls.runner.invoke( @@ -166,7 +166,7 @@ def setup_class(cls): cls.t = tempfile.mkdtemp() # copy the 'packages' directory in the parent of the agent folder. shutil.copytree(Path(CUR_PATH, "..", "packages"), Path(cls.t, "packages")) - cls.connection_id = "fetchai/http_client:0.4.0" + cls.connection_id = "fetchai/http_client:0.5.0" cls.connection_name = "http_client" os.chdir(cls.t) diff --git a/tests/test_cli/test_run.py b/tests/test_cli/test_run.py index 07231debcf..4a23672bd2 100644 --- a/tests/test_cli/test_run.py +++ b/tests/test_cli/test_run.py @@ -75,7 +75,7 @@ def test_run(): result = runner.invoke( cli, - [*CLI_LOG_OPTION, "add", "--local", "connection", "fetchai/http_client:0.4.0"], + [*CLI_LOG_OPTION, "add", "--local", "connection", "fetchai/http_client:0.5.0"], ) assert result.exit_code == 0 @@ -86,7 +86,7 @@ def test_run(): "config", "set", "agent.default_connection", - "fetchai/http_client:0.4.0", + "fetchai/http_client:0.5.0", ], ) assert result.exit_code == 0 @@ -167,9 +167,9 @@ def test_run_with_default_connection(): @pytest.mark.parametrize( argnames=["connection_ids"], argvalues=[ - ["fetchai/http_client:0.4.0,{}".format(str(DEFAULT_CONNECTION))], - ["'fetchai/http_client:0.4.0, {}'".format(str(DEFAULT_CONNECTION))], - ["fetchai/http_client:0.4.0,,{},".format(str(DEFAULT_CONNECTION))], + ["fetchai/http_client:0.5.0,{}".format(str(DEFAULT_CONNECTION))], + ["'fetchai/http_client:0.5.0, {}'".format(str(DEFAULT_CONNECTION))], + ["fetchai/http_client:0.5.0,,{},".format(str(DEFAULT_CONNECTION))], ], ) def test_run_multiple_connections(connection_ids): @@ -194,7 +194,7 @@ def test_run_multiple_connections(connection_ids): result = runner.invoke( cli, - [*CLI_LOG_OPTION, "add", "--local", "connection", "fetchai/http_client:0.4.0"], + [*CLI_LOG_OPTION, "add", "--local", "connection", "fetchai/http_client:0.5.0"], ) assert result.exit_code == 0 @@ -252,7 +252,7 @@ def test_run_unknown_private_key(): result = runner.invoke( cli, - [*CLI_LOG_OPTION, "add", "--local", "connection", "fetchai/http_client:0.4.0"], + [*CLI_LOG_OPTION, "add", "--local", "connection", "fetchai/http_client:0.5.0"], ) assert result.exit_code == 0 result = runner.invoke( @@ -262,7 +262,7 @@ def test_run_unknown_private_key(): "config", "set", "agent.default_connection", - "fetchai/http_client:0.4.0", + "fetchai/http_client:0.5.0", ], ) assert result.exit_code == 0 @@ -291,7 +291,7 @@ def test_run_unknown_private_key(): result = runner.invoke( cli, - [*CLI_LOG_OPTION, "run", "--connections", "fetchai/http_client:0.4.0"], + [*CLI_LOG_OPTION, "run", "--connections", "fetchai/http_client:0.5.0"], standalone_mode=False, ) @@ -328,7 +328,7 @@ def test_run_unknown_private_key(): # result = runner.invoke( # cli, -# [*CLI_LOG_OPTION, "add", "--local", "connection", "fetchai/http_client:0.4.0"], +# [*CLI_LOG_OPTION, "add", "--local", "connection", "fetchai/http_client:0.5.0"], # ) # assert result.exit_code == 0 # result = runner.invoke( @@ -338,7 +338,7 @@ def test_run_unknown_private_key(): # "config", # "set", # "agent.default_connection", -# "fetchai/http_client:0.4.0", +# "fetchai/http_client:0.5.0", # ], # ) # assert result.exit_code == 0 @@ -367,7 +367,7 @@ def test_run_unknown_private_key(): # result = runner.invoke( # cli, -# [*CLI_LOG_OPTION, "run", "--connections", "fetchai/http_client:0.4.0"], +# [*CLI_LOG_OPTION, "run", "--connections", "fetchai/http_client:0.5.0"], # standalone_mode=False, # ) @@ -403,7 +403,7 @@ def test_run_fet_private_key_config(): result = runner.invoke( cli, - [*CLI_LOG_OPTION, "add", "--local", "connection", "fetchai/http_client:0.4.0"], + [*CLI_LOG_OPTION, "add", "--local", "connection", "fetchai/http_client:0.5.0"], ) assert result.exit_code == 0 @@ -427,7 +427,7 @@ def test_run_fet_private_key_config(): error_msg = "" try: - cli.main([*CLI_LOG_OPTION, "run", "--connections", "fetchai/http_client:0.4.0"]) + cli.main([*CLI_LOG_OPTION, "run", "--connections", "fetchai/http_client:0.5.0"]) except SystemExit as e: error_msg = str(e) @@ -462,7 +462,7 @@ def test_run_ethereum_private_key_config(): result = runner.invoke( cli, - [*CLI_LOG_OPTION, "add", "--local", "connection", "fetchai/http_client:0.4.0"], + [*CLI_LOG_OPTION, "add", "--local", "connection", "fetchai/http_client:0.5.0"], ) assert result.exit_code == 0 @@ -486,7 +486,7 @@ def test_run_ethereum_private_key_config(): error_msg = "" try: - cli.main([*CLI_LOG_OPTION, "run", "--connections", "fetchai/http_client:0.4.0"]) + cli.main([*CLI_LOG_OPTION, "run", "--connections", "fetchai/http_client:0.5.0"]) except SystemExit as e: error_msg = str(e) @@ -522,7 +522,7 @@ def test_run_ledger_apis(): result = runner.invoke( cli, - [*CLI_LOG_OPTION, "add", "--local", "connection", "fetchai/http_client:0.4.0"], + [*CLI_LOG_OPTION, "add", "--local", "connection", "fetchai/http_client:0.5.0"], ) assert result.exit_code == 0 result = runner.invoke( @@ -532,7 +532,7 @@ def test_run_ledger_apis(): "config", "set", "agent.default_connection", - "fetchai/http_client:0.4.0", + "fetchai/http_client:0.5.0", ], ) assert result.exit_code == 0 @@ -569,7 +569,7 @@ def test_run_ledger_apis(): "aea.cli", "run", "--connections", - "fetchai/http_client:0.4.0", + "fetchai/http_client:0.5.0", ], stdout=subprocess.PIPE, env=os.environ.copy(), @@ -618,7 +618,7 @@ def test_run_fet_ledger_apis(): result = runner.invoke( cli, - [*CLI_LOG_OPTION, "add", "--local", "connection", "fetchai/http_client:0.4.0"], + [*CLI_LOG_OPTION, "add", "--local", "connection", "fetchai/http_client:0.5.0"], ) assert result.exit_code == 0 result = runner.invoke( @@ -628,7 +628,7 @@ def test_run_fet_ledger_apis(): "config", "set", "agent.default_connection", - "fetchai/http_client:0.4.0", + "fetchai/http_client:0.5.0", ], ) assert result.exit_code == 0 @@ -662,7 +662,7 @@ def test_run_fet_ledger_apis(): "aea.cli", "run", "--connections", - "fetchai/http_client:0.4.0", + "fetchai/http_client:0.5.0", ], stdout=subprocess.PIPE, env=os.environ.copy(), @@ -712,7 +712,7 @@ def test_run_with_install_deps(): result = runner.invoke( cli, - [*CLI_LOG_OPTION, "add", "--local", "connection", "fetchai/http_client:0.4.0"], + [*CLI_LOG_OPTION, "add", "--local", "connection", "fetchai/http_client:0.5.0"], ) assert result.exit_code == 0 result = runner.invoke( @@ -722,7 +722,7 @@ def test_run_with_install_deps(): "config", "set", "agent.default_connection", - "fetchai/http_client:0.4.0", + "fetchai/http_client:0.5.0", ], ) assert result.exit_code == 0 @@ -738,7 +738,7 @@ def test_run_with_install_deps(): "run", "--install-deps", "--connections", - "fetchai/http_client:0.4.0", + "fetchai/http_client:0.5.0", ], env=os.environ, maxread=10000, @@ -784,7 +784,7 @@ def test_run_with_install_deps_and_requirement_file(): result = runner.invoke( cli, - [*CLI_LOG_OPTION, "add", "--local", "connection", "fetchai/http_client:0.4.0"], + [*CLI_LOG_OPTION, "add", "--local", "connection", "fetchai/http_client:0.5.0"], ) assert result.exit_code == 0 result = runner.invoke( @@ -794,7 +794,7 @@ def test_run_with_install_deps_and_requirement_file(): "config", "set", "agent.default_connection", - "fetchai/http_client:0.4.0", + "fetchai/http_client:0.5.0", ], ) assert result.exit_code == 0 @@ -814,7 +814,7 @@ def test_run_with_install_deps_and_requirement_file(): "run", "--install-deps", "--connections", - "fetchai/http_client:0.4.0", + "fetchai/http_client:0.5.0", ], env=os.environ, maxread=10000, @@ -872,7 +872,7 @@ def setup_class(cls): "add", "--local", "connection", - "fetchai/http_client:0.4.0", + "fetchai/http_client:0.5.0", ], standalone_mode=False, ) @@ -889,7 +889,7 @@ def setup_class(cls): try: cli.main( - [*CLI_LOG_OPTION, "run", "--connections", "fetchai/http_client:0.4.0"] + [*CLI_LOG_OPTION, "run", "--connections", "fetchai/http_client:0.5.0"] ) except SystemExit as e: cls.exit_code = e.code @@ -1084,7 +1084,7 @@ def setup_class(cls): """Set the test up.""" cls.runner = CliRunner() cls.agent_name = "myagent" - cls.connection_id = PublicId.from_str("fetchai/http_client:0.4.0") + cls.connection_id = PublicId.from_str("fetchai/http_client:0.5.0") cls.connection_name = cls.connection_id.name cls.connection_author = cls.connection_id.author cls.cwd = os.getcwd() @@ -1118,7 +1118,7 @@ def setup_class(cls): "config", "set", "agent.default_connection", - "fetchai/http_client:0.4.0", + "fetchai/http_client:0.5.0", ], ) assert result.exit_code == 0 @@ -1177,7 +1177,7 @@ def setup_class(cls): """Set the test up.""" cls.runner = CliRunner() cls.agent_name = "myagent" - cls.connection_id = PublicId.from_str("fetchai/http_client:0.4.0") + cls.connection_id = PublicId.from_str("fetchai/http_client:0.5.0") cls.connection_author = cls.connection_id.author cls.connection_name = cls.connection_id.name cls.cwd = os.getcwd() @@ -1211,7 +1211,7 @@ def setup_class(cls): "config", "set", "agent.default_connection", - "fetchai/http_client:0.4.0", + "fetchai/http_client:0.5.0", ], ) assert result.exit_code == 0 @@ -1269,7 +1269,7 @@ def setup_class(cls): """Set the test up.""" cls.runner = CliRunner() cls.agent_name = "myagent" - cls.connection_id = "fetchai/http_client:0.4.0" + cls.connection_id = "fetchai/http_client:0.5.0" cls.connection_name = "http_client" cls.cwd = os.getcwd() cls.t = tempfile.mkdtemp() @@ -1302,7 +1302,7 @@ def setup_class(cls): "config", "set", "agent.default_connection", - "fetchai/http_client:0.4.0", + "fetchai/http_client:0.5.0", ], ) assert result.exit_code == 0 diff --git a/tests/test_cli_gui/test_run_agent.py b/tests/test_cli_gui/test_run_agent.py index 04b366a81c..0e840f7ade 100644 --- a/tests/test_cli_gui/test_run_agent.py +++ b/tests/test_cli_gui/test_run_agent.py @@ -64,7 +64,7 @@ def test_create_and_run_agent(): response_add = app.post( "api/agent/" + agent_id + "/connection", content_type="application/json", - data=json.dumps("fetchai/local:0.3.0"), + data=json.dumps("fetchai/local:0.4.0"), ) assert response_add.status_code == 201 diff --git a/tests/test_docs/test_bash_yaml/md_files/bash-aries-cloud-agent-demo.md b/tests/test_docs/test_bash_yaml/md_files/bash-aries-cloud-agent-demo.md index 3d1e3ff22a..0e6fbad6c3 100644 --- a/tests/test_docs/test_bash_yaml/md_files/bash-aries-cloud-agent-demo.md +++ b/tests/test_docs/test_bash_yaml/md_files/bash-aries-cloud-agent-demo.md @@ -30,8 +30,8 @@ aea config set --type int vendor.fetchai.skills.aries_alice.handlers.aries_demo_ aea config set --type int vendor.fetchai.skills.aries_alice.handlers.aries_demo_http.args.admin_port 8031 ``` ``` bash -aea add connection fetchai/http_client:0.4.0 -aea add connection fetchai/webhook:0.3.0 +aea add connection fetchai/http_client:0.5.0 +aea add connection fetchai/webhook:0.4.0 aea add connection fetchai/oef:0.6.0 ``` ``` bash @@ -97,8 +97,8 @@ aea config set --type int vendor.fetchai.skills.aries_faber.handlers.aries_demo_ aea config set vendor.fetchai.skills.aries_faber.handlers.aries_demo_http.args.alice_id ``` ``` bash -aea add connection fetchai/http_client:0.4.0 -aea add connection fetchai/webhook:0.3.0 +aea add connection fetchai/http_client:0.5.0 +aea add connection fetchai/webhook:0.4.0 aea add connection fetchai/oef:0.6.0 ``` ``` bash @@ -108,7 +108,7 @@ aea config set --type int vendor.fetchai.connections.webhook.config.webhook_port aea config set vendor.fetchai.connections.webhook.config.webhook_url_path /webhooks/topic/{topic}/ ``` ``` bash -aea config set agent.default_connection fetchai/http_client:0.4.0 +aea config set agent.default_connection fetchai/http_client:0.5.0 ``` ``` bash aea fetch fetchai/aries_faber:0.5.0 diff --git a/tests/test_docs/test_bash_yaml/md_files/bash-http-connection-and-skill.md b/tests/test_docs/test_bash_yaml/md_files/bash-http-connection-and-skill.md index 1e899022b1..38af6c6f41 100644 --- a/tests/test_docs/test_bash_yaml/md_files/bash-http-connection-and-skill.md +++ b/tests/test_docs/test_bash_yaml/md_files/bash-http-connection-and-skill.md @@ -3,10 +3,10 @@ aea create my_aea cd my_aea ``` ``` bash -aea add connection fetchai/http_server:0.4.0 +aea add connection fetchai/http_server:0.5.0 ``` ``` bash -aea config set agent.default_connection fetchai/http_server:0.4.0 +aea config set agent.default_connection fetchai/http_server:0.5.0 ``` ``` bash aea config set vendor.fetchai.connections.http_server.config.api_spec_path "../examples/http_ex/petstore.yaml" diff --git a/tests/test_packages/test_connections/test_oef/test_communication.py b/tests/test_packages/test_connections/test_oef/test_communication.py index 136a4967fc..715b4b3699 100644 --- a/tests/test_packages/test_connections/test_oef/test_communication.py +++ b/tests/test_packages/test_connections/test_oef/test_communication.py @@ -53,7 +53,6 @@ from aea.protocols.default.message import DefaultMessage from aea.test_tools.test_cases import UseOef -import packages from packages.fetchai.connections.oef.connection import OEFObjectTranslator from packages.fetchai.protocols.fipa import fipa_pb2 from packages.fetchai.protocols.fipa.message import FipaMessage @@ -1063,7 +1062,7 @@ async def test_send_oef_message(self, pytestconfig): await oef_connection.disconnect() @pytest.mark.asyncio - async def test_cancelled_receive(self, pytestconfig): + async def test_cancelled_receive(self, pytestconfig, caplog): """Test the case when a receive request is cancelled.""" oef_connection = _make_oef_connection( address=FETCHAI_ADDRESS_ONE, oef_addr="127.0.0.1", oef_port=10000, @@ -1071,21 +1070,18 @@ async def test_cancelled_receive(self, pytestconfig): oef_connection.loop = asyncio.get_event_loop() await oef_connection.connect() - patch = unittest.mock.patch.object( - packages.fetchai.connections.oef.connection.logger, "debug" - ) - mocked_logger_debug = patch.start() + with caplog.at_level(logging.DEBUG, "aea.packages.fetchai.connections.oef"): - async def receive(): - await oef_connection.receive() + async def receive(): + await oef_connection.receive() - task = asyncio.ensure_future(receive(), loop=asyncio.get_event_loop()) - await asyncio.sleep(0.1) - task.cancel() - await asyncio.sleep(0.1) - await oef_connection.disconnect() + task = asyncio.ensure_future(receive(), loop=asyncio.get_event_loop()) + await asyncio.sleep(0.1) + task.cancel() + await asyncio.sleep(0.1) + await oef_connection.disconnect() - mocked_logger_debug.assert_called_once_with("Receive cancelled.") + assert "Receive cancelled." in caplog.text @pytest.mark.asyncio async def test_exception_during_receive(self, pytestconfig): @@ -1125,7 +1121,7 @@ async def test_connecting_twice_is_ok(self, pytestconfig): @pytest.mark.skipif( sys.version_info < (3, 7), reason="Python version < 3.7 not supported by the OEF." ) -async def test_cannot_connect_to_oef(): +async def test_cannot_connect_to_oef(caplog): """Test the case when we can't connect to the OEF.""" oef_connection = _make_oef_connection( address=FETCHAI_ADDRESS_ONE, @@ -1133,20 +1129,16 @@ async def test_cannot_connect_to_oef(): oef_port=61234, # use addr instead of hostname to avoid name resolution ) - patch = unittest.mock.patch.object( - packages.fetchai.connections.oef.connection.logger, "warning" - ) - mocked_logger_warning = patch.start() + with caplog.at_level(logging.DEBUG, logger="aea.packages.fetchai.connections.oef"): - task = asyncio.ensure_future( - oef_connection.connect(), loop=asyncio.get_event_loop() - ) - await asyncio.sleep(3.0) - mocked_logger_warning.assert_called_with( - "Cannot connect to OEFChannel. Retrying in 5 seconds..." - ) - await cancel_and_wait(task) - await oef_connection.disconnect() + task = asyncio.ensure_future( + oef_connection.connect(), loop=asyncio.get_event_loop() + ) + await asyncio.sleep(3.0) + assert "Cannot connect to OEFChannel. Retrying in 5 seconds..." in caplog.text + + await cancel_and_wait(task) + await oef_connection.disconnect() @pytest.mark.asyncio diff --git a/tests/test_packages/test_connections/test_p2p_libp2p_client/test_aea_cli.py b/tests/test_packages/test_connections/test_p2p_libp2p_client/test_aea_cli.py index ad5910dea0..c978b470c6 100644 --- a/tests/test_packages/test_connections/test_p2p_libp2p_client/test_aea_cli.py +++ b/tests/test_packages/test_connections/test_p2p_libp2p_client/test_aea_cli.py @@ -62,7 +62,7 @@ def test_node(self): assert self.node_connection.connection_status.is_connected is True def test_connection(self): - self.add_item("connection", "fetchai/p2p_libp2p_client:0.2.0") + self.add_item("connection", "fetchai/p2p_libp2p_client:0.3.0") config_path = "vendor.fetchai.connections.p2p_libp2p_client.config" self.force_set_config( "{}.nodes".format(config_path), diff --git a/tests/test_packages/test_connections/test_tcp/test_base.py b/tests/test_packages/test_connections/test_tcp/test_base.py index 6bb0d5d53a..4df98aaac5 100644 --- a/tests/test_packages/test_connections/test_tcp/test_base.py +++ b/tests/test_packages/test_connections/test_tcp/test_base.py @@ -20,6 +20,7 @@ """This module contains the tests for the TCP base module.""" import asyncio +import logging import unittest.mock from asyncio import CancelledError @@ -28,8 +29,6 @@ from aea.mail.base import Envelope from aea.protocols.default.message import DefaultMessage -import packages - from tests.conftest import ( _make_tcp_client_connection, _make_tcp_server_connection, @@ -38,7 +37,7 @@ @pytest.mark.asyncio -async def test_connect_twice(): +async def test_connect_twice(caplog): """Test that connecting twice the tcp connection works correctly.""" port = get_unused_tcp_port() tcp_connection = _make_tcp_server_connection("address", "127.0.0.1", port) @@ -48,17 +47,15 @@ async def test_connect_twice(): await tcp_connection.connect() await asyncio.sleep(0.1) - with unittest.mock.patch.object( - packages.fetchai.connections.tcp.base.logger, "warning" - ) as mock_logger_warning: + with caplog.at_level(logging.WARNING, "aea.packages.fetchai.connections.tcp"): await tcp_connection.connect() - mock_logger_warning.assert_called_with("Connection already set up.") + assert "Connection already set up." in caplog.text await tcp_connection.disconnect() @pytest.mark.asyncio -async def test_connect_raises_exception(): +async def test_connect_raises_exception(caplog): """Test the case that a connection attempt raises an exception.""" port = get_unused_tcp_port() tcp_connection = _make_tcp_server_connection("address", "127.0.0.1", port) @@ -66,31 +63,27 @@ async def test_connect_raises_exception(): loop = asyncio.get_event_loop() tcp_connection.loop = loop - with unittest.mock.patch.object( - packages.fetchai.connections.tcp.base.logger, "error" - ) as mock_logger_error: + with caplog.at_level(logging.ERROR, "aea.packages.fetchai.connections.tcp"): with unittest.mock.patch.object( tcp_connection, "setup", side_effect=Exception("error during setup") ): await tcp_connection.connect() - mock_logger_error.assert_called_with("error during setup") + assert "error during setup" in caplog.text @pytest.mark.asyncio -async def test_disconnect_when_already_disconnected(): +async def test_disconnect_when_already_disconnected(caplog): """Test that disconnecting a connection already disconnected works correctly.""" port = get_unused_tcp_port() tcp_connection = _make_tcp_server_connection("address", "127.0.0.1", port) - with unittest.mock.patch.object( - packages.fetchai.connections.tcp.base.logger, "warning" - ) as mock_logger_warning: + with caplog.at_level(logging.WARNING, "aea.packages.fetchai.connections.tcp"): await tcp_connection.disconnect() - mock_logger_warning.assert_called_with("Connection already disconnected.") + assert "Connection already disconnected." in caplog.text @pytest.mark.asyncio -async def test_send_to_unknown_destination(): +async def test_send_to_unknown_destination(caplog): """Test that a message to an unknown destination logs an error.""" address = "address" port = get_unused_tcp_port() @@ -101,13 +94,9 @@ async def test_send_to_unknown_destination(): protocol_id=DefaultMessage.protocol_id, message=b"", ) - with unittest.mock.patch.object( - packages.fetchai.connections.tcp.base.logger, "error" - ) as mock_logger_error: + with caplog.at_level(logging.ERROR, "aea.packages.fetchai.connections.tcp"): await tcp_connection.send(envelope) - mock_logger_error.assert_called_with( - "[{}]: Cannot send envelope {}".format(address, envelope) - ) + assert "[{}]: Cannot send envelope {}".format(address, envelope) in caplog.text @pytest.mark.asyncio diff --git a/tests/test_packages/test_connections/test_tcp/test_communication.py b/tests/test_packages/test_connections/test_tcp/test_communication.py index f85883e5b8..1a4502a039 100644 --- a/tests/test_packages/test_connections/test_tcp/test_communication.py +++ b/tests/test_packages/test_connections/test_tcp/test_communication.py @@ -20,6 +20,7 @@ """This module contains the tests for the TCP connection communication.""" import asyncio +import logging import struct import unittest.mock @@ -29,8 +30,6 @@ from aea.multiplexer import Multiplexer from aea.protocols.default.message import DefaultMessage -import packages - from tests.conftest import ( _make_tcp_client_connection, _make_tcp_server_connection, @@ -157,7 +156,7 @@ class TestTCPClientConnection: """Test TCP Client code.""" @pytest.mark.asyncio - async def test_receive_cancelled(self): + async def test_receive_cancelled(self, caplog): """Test that cancelling a receive task works correctly.""" port = get_unused_tcp_port() tcp_server = _make_tcp_server_connection("address_server", "127.0.0.1", port,) @@ -166,23 +165,21 @@ async def test_receive_cancelled(self): await tcp_server.connect() await tcp_client.connect() - with unittest.mock.patch.object( - packages.fetchai.connections.tcp.tcp_client.logger, "debug" - ) as mock_logger_debug: + with caplog.at_level( + logging.DEBUG, "aea.packages.fetchai.connections.tcp.tcp_client" + ): task = asyncio.ensure_future(tcp_client.receive()) await asyncio.sleep(0.1) task.cancel() await asyncio.sleep(0.1) - mock_logger_debug.assert_called_with( - "[{}] Read cancelled.".format("address_client") - ) + assert "[{}] Read cancelled.".format("address_client") in caplog.text assert task.result() is None await tcp_client.disconnect() await tcp_server.disconnect() @pytest.mark.asyncio - async def test_receive_raises_struct_error(self): + async def test_receive_raises_struct_error(self, caplog): """Test the case when a receive raises a struct error.""" port = get_unused_tcp_port() tcp_server = _make_tcp_server_connection("address_server", "127.0.0.1", port,) @@ -191,15 +188,15 @@ async def test_receive_raises_struct_error(self): await tcp_server.connect() await tcp_client.connect() - with unittest.mock.patch.object( - packages.fetchai.connections.tcp.tcp_client.logger, "debug" - ) as mock_logger_debug: + with caplog.at_level( + logging.DEBUG, "aea.packages.fetchai.connections.tcp.tcp_client" + ): with unittest.mock.patch.object( tcp_client, "_recv", side_effect=struct.error ): task = asyncio.ensure_future(tcp_client.receive()) await asyncio.sleep(0.1) - mock_logger_debug.assert_called_with("Struct error: ") + assert "Struct error: " in caplog.text assert task.result() is None await tcp_client.disconnect() @@ -231,7 +228,7 @@ class TestTCPServerConnection: """Test TCP Server code.""" @pytest.mark.asyncio - async def test_receive_raises_exception(self): + async def test_receive_raises_exception(self, caplog): """Test the case when a receive raises a generic exception.""" port = get_unused_tcp_port() tcp_server = _make_tcp_server_connection("address_server", "127.0.0.1", port,) @@ -240,17 +237,15 @@ async def test_receive_raises_exception(self): await tcp_server.connect() await tcp_client.connect() await asyncio.sleep(0.1) - with unittest.mock.patch.object( - packages.fetchai.connections.tcp.tcp_server.logger, "error" - ) as mock_logger_error: + with caplog.at_level( + logging.DEBUG, "aea.packages.fetchai.connections.tcp.tcp_server" + ): with unittest.mock.patch( "asyncio.wait", side_effect=Exception("generic exception") ): result = await tcp_server.receive() assert result is None - mock_logger_error.assert_called_with( - "Error in the receiving loop: generic exception" - ) + assert "Error in the receiving loop: generic exception" in caplog.text await tcp_client.disconnect() await tcp_server.disconnect() diff --git a/tests/test_packages/test_skills/test_http_echo.py b/tests/test_packages/test_skills/test_http_echo.py index a1fc74d058..4a0c8fdf14 100644 --- a/tests/test_packages/test_skills/test_http_echo.py +++ b/tests/test_packages/test_skills/test_http_echo.py @@ -36,9 +36,9 @@ class TestHttpEchoSkill(AEATestCaseEmpty): @skip_test_windows def test_echo(self): """Run the echo skill sequence.""" - self.add_item("connection", "fetchai/http_server:0.4.0") + self.add_item("connection", "fetchai/http_server:0.5.0") self.add_item("skill", "fetchai/http_echo:0.3.0") - self.set_config("agent.default_connection", "fetchai/http_server:0.4.0") + self.set_config("agent.default_connection", "fetchai/http_server:0.5.0") self.set_config( "vendor.fetchai.connections.http_server.config.api_spec_path", API_SPEC_PATH )