From 9e6b9de57ccab60d00a33b38d45fbad8f2135d97 Mon Sep 17 00:00:00 2001 From: MarcoFavorito Date: Mon, 13 Jul 2020 20:00:05 +0200 Subject: [PATCH 01/11] add agent logger adapter and use it in Skill.from_config --- aea/components/base.py | 13 ++++++++++++- aea/helpers/logging.py | 39 +++++++++++++++++++++++++++++++++++++++ aea/skills/base.py | 5 ++++- 3 files changed, 55 insertions(+), 2 deletions(-) create mode 100644 aea/helpers/logging.py diff --git a/aea/components/base.py b/aea/components/base.py index 25a8e2e116..0ff2ab59cb 100644 --- a/aea/components/base.py +++ b/aea/components/base.py @@ -21,8 +21,9 @@ import logging import types from abc import ABC +from logging import Logger, LoggerAdapter from pathlib import Path -from typing import Dict, Optional +from typing import Dict, Optional, Union from aea.configurations.base import ( ComponentConfiguration, @@ -51,6 +52,7 @@ def __init__( self._configuration = configuration self._directory = None # type: Optional[Path] self._is_vendor = is_vendor + self._logger: Optional[Logger] = None # mapping from import path to module object # the keys are dotted paths of Python modules. @@ -100,3 +102,12 @@ def directory(self, path: Path) -> None: """Set the directory. Raise error if already set.""" assert self._directory is None, "Directory already set." self._directory = path + + @property + def logger(self) -> Union[Logger, LoggerAdapter]: + """Get the component logger.""" + if self._logger is None: + # if not set (e.g. programmatic instantiation) + # return a default one with "aea" as logger namespace. + return logging.getLogger("aea") + return self._logger diff --git a/aea/helpers/logging.py b/aea/helpers/logging.py new file mode 100644 index 0000000000..c3a1bea6df --- /dev/null +++ b/aea/helpers/logging.py @@ -0,0 +1,39 @@ +# -*- coding: utf-8 -*- +# ------------------------------------------------------------------------------ +# +# Copyright 2018-2019 Fetch.AI Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------------ +"""Logging helpers.""" +from logging import Logger, LoggerAdapter +from typing import Any, MutableMapping, Tuple + + +class AgentLoggerAdapter(LoggerAdapter): + """This class is a logger adapter that prepends the agent name to log messages.""" + + def __init__(self, logger: Logger, agent_name: str): + """ + Initialize the logger adapter. + + :param agent_name: the agent name. + """ + super().__init__(logger, dict(agent_name=agent_name)) + + def process( + self, msg: Any, kwargs: MutableMapping[str, Any] + ) -> Tuple[Any, MutableMapping[str, Any]]: + """Prepend the agent name to every log message.""" + return "[%s] %s" % (self.extra["agent_name"], msg), kwargs diff --git a/aea/skills/base.py b/aea/skills/base.py index 5dea68ef4d..6086192f07 100644 --- a/aea/skills/base.py +++ b/aea/skills/base.py @@ -45,6 +45,7 @@ from aea.contracts.base import Contract from aea.exceptions import AEAException from aea.helpers.base import load_aea_package, load_module +from aea.helpers.logging import AgentLoggerAdapter from aea.mail.base import Address from aea.multiplexer import OutBox from aea.protocols.base import Message @@ -704,7 +705,9 @@ def from_config( skill_context = SkillContext() skill_context.set_agent_context(agent_context) logger_name = f"aea.packages.{configuration.author}.skills.{configuration.name}" - skill_context.logger = logging.getLogger(logger_name) + logger = logging.getLogger(logger_name) + logger = AgentLoggerAdapter(logger, agent_context.agent_name) + skill_context.logger = logger skill = Skill(configuration, skill_context) From e327f46cfc49d112ce8568ab9539daab59957bdb Mon Sep 17 00:00:00 2001 From: MarcoFavorito Date: Mon, 13 Jul 2020 21:23:32 +0200 Subject: [PATCH 02/11] fix static typing --- aea/skills/base.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/aea/skills/base.py b/aea/skills/base.py index 6086192f07..4e7aab0e23 100644 --- a/aea/skills/base.py +++ b/aea/skills/base.py @@ -25,11 +25,11 @@ import queue import re from abc import ABC, abstractmethod -from logging import Logger +from logging import Logger, LoggerAdapter from pathlib import Path from queue import Queue from types import SimpleNamespace -from typing import Any, Dict, Optional, Sequence, Set, Tuple, Type, cast +from typing import Any, Dict, Optional, Sequence, Set, Tuple, Type, Union, cast from aea.components.base import Component from aea.configurations.base import ( @@ -74,16 +74,16 @@ def __init__( self._is_active = True # type: bool self._new_behaviours_queue = queue.Queue() # type: Queue - self._logger = None # type: Optional[Logger] + self._logger: Optional[Union[Logger, LoggerAdapter]] = None @property - def logger(self) -> Logger: + def logger(self) -> Union[Logger, LoggerAdapter]: """Get the logger.""" assert self._logger is not None, "Logger not set." return self._logger @logger.setter - def logger(self, logger_: Logger) -> None: + def logger(self, logger_: Union[Logger, AgentLoggerAdapter]) -> None: assert self._logger is None, "Logger already set." self._logger = logger_ @@ -705,8 +705,9 @@ def from_config( skill_context = SkillContext() skill_context.set_agent_context(agent_context) logger_name = f"aea.packages.{configuration.author}.skills.{configuration.name}" - logger = logging.getLogger(logger_name) - logger = AgentLoggerAdapter(logger, agent_context.agent_name) + logger = AgentLoggerAdapter( + logging.getLogger(logger_name), agent_context.agent_name + ) skill_context.logger = logger skill = Skill(configuration, skill_context) From 589b5439d9448196a0724c84897f62f7c4c620d2 Mon Sep 17 00:00:00 2001 From: MarcoFavorito Date: Mon, 13 Jul 2020 23:29:38 +0200 Subject: [PATCH 03/11] attach agent logger to every component --- aea/aea_builder.py | 45 ++++++++++++++++++++++++++++++++++-------- aea/components/base.py | 8 +++++++- aea/skills/base.py | 15 ++++++++++++++ 3 files changed, 59 insertions(+), 9 deletions(-) diff --git a/aea/aea_builder.py b/aea/aea_builder.py index 46fbf05383..5461ec1dd5 100644 --- a/aea/aea_builder.py +++ b/aea/aea_builder.py @@ -84,6 +84,7 @@ from aea.exceptions import AEAException from aea.helpers.base import load_aea_package, load_module from aea.helpers.exception_policy import ExceptionPolicyEnum +from aea.helpers.logging import AgentLoggerAdapter from aea.helpers.pypi import is_satisfiable from aea.helpers.pypi import merge_dependencies from aea.identity.base import Identity @@ -879,11 +880,12 @@ def build(self, connection_ids: Optional[Collection[PublicId]] = None,) -> AEA: copy(self.private_key_paths), copy(self.connection_private_key_paths) ) identity = self._build_identity_from_wallet(wallet) - self._load_and_add_components(ComponentType.PROTOCOL, resources) - self._load_and_add_components(ComponentType.CONTRACT, resources) + self._load_and_add_components(ComponentType.PROTOCOL, resources, identity) + self._load_and_add_components(ComponentType.CONTRACT, resources, identity) self._load_and_add_components( ComponentType.CONNECTION, resources, + identity, identity=identity, crypto_store=wallet.connection_cryptos, ) @@ -908,7 +910,7 @@ def build(self, connection_ids: Optional[Collection[PublicId]] = None,) -> AEA: **deepcopy(self._context_namespace), ) self._load_and_add_components( - ComponentType.SKILL, resources, agent_context=aea.context + ComponentType.SKILL, resources, identity, agent_context=aea.context ) self._build_called = True self._populate_contract_registry() @@ -1346,28 +1348,36 @@ def from_aea_project( return builder def _load_and_add_components( - self, component_type: ComponentType, resources: Resources, **kwargs + self, + component_type: ComponentType, + resources: Resources, + aea_identity: Identity, + **kwargs, ) -> None: """ Load and add components added to the builder to a Resources instance. :param component_type: the component type for which :param resources: the resources object to populate. + :param aea_identity: the identity of the AEA. :param kwargs: keyword argument to forward to the component loader. :return: None """ for configuration in self._package_dependency_manager.get_components_by_type( component_type ).values(): + if configuration.is_abstract_component: + load_aea_package(configuration) + continue + if configuration in self._component_instances[component_type].keys(): component = self._component_instances[component_type][configuration] - resources.add_component(component) - elif configuration.is_abstract_component: - load_aea_package(configuration) else: configuration = deepcopy(configuration) component = load_component_from_config(configuration, **kwargs) - resources.add_component(component) + + _set_logger_to_component(component, configuration, aea_identity) + resources.add_component(component) def _populate_contract_registry(self): """Populate contract registry.""" @@ -1413,6 +1423,25 @@ def _check_we_can_build(self): ) +def _set_logger_to_component( + component: Component, configuration: ComponentConfiguration, identity: Identity, +) -> None: + """ + Set the logger to the component. + + :param component: the component instance. + :param configuration: the component configuration + :param identity: the identity object of the AEA. + :return: None + """ + if configuration.component_type == ComponentType.SKILL: + # skip because skill object already have their own logger from the skill context. + return + logger_name = f"aea.packages.{configuration.author}.{configuration.component_type.to_plural()}.{configuration.name}" + logger = AgentLoggerAdapter(logging.getLogger(logger_name), identity.name) + component.logger = logger + + # TODO this function is repeated in 'aea.cli.utils.package_utils.py' def _verify_or_create_private_keys(aea_project_path: Path) -> None: """Verify or create private keys.""" diff --git a/aea/components/base.py b/aea/components/base.py index 0ff2ab59cb..35ecf99c49 100644 --- a/aea/components/base.py +++ b/aea/components/base.py @@ -52,7 +52,7 @@ def __init__( self._configuration = configuration self._directory = None # type: Optional[Path] self._is_vendor = is_vendor - self._logger: Optional[Logger] = None + self._logger: Optional[Union[Logger, LoggerAdapter]] = None # mapping from import path to module object # the keys are dotted paths of Python modules. @@ -111,3 +111,9 @@ def logger(self) -> Union[Logger, LoggerAdapter]: # return a default one with "aea" as logger namespace. return logging.getLogger("aea") return self._logger + + @logger.setter + def logger(self, logger: Union[Logger, LoggerAdapter]): + """Set the logger.""" + assert self._logger is None, "Logger already set." + self._logger = logger diff --git a/aea/skills/base.py b/aea/skills/base.py index 4e7aab0e23..f651a75cc7 100644 --- a/aea/skills/base.py +++ b/aea/skills/base.py @@ -684,6 +684,21 @@ def from_dir(cls, directory: str, agent_context: AgentContext) -> "Skill": configuration.directory = Path(directory) return Skill.from_config(configuration, agent_context) + @property + def logger(self) -> Union[Logger, LoggerAdapter]: + """ + Get the logger. + + In the case of a skill, return the + logger provided by the skill context. + """ + return self.skill_context.logger + + @logger.setter + def logger(self, *args) -> None: + """Set the logger.""" + raise ValueError("Cannot set logger to a skill component..") + @classmethod def from_config( cls, configuration: SkillConfig, agent_context: AgentContext From 331d56242fa137440b2b04f2ec0630faba91d2a2 Mon Sep 17 00:00:00 2001 From: MarcoFavorito Date: Mon, 13 Jul 2020 23:58:14 +0200 Subject: [PATCH 04/11] remove check in Component.logger setter --- aea/components/base.py | 1 - 1 file changed, 1 deletion(-) diff --git a/aea/components/base.py b/aea/components/base.py index 35ecf99c49..a4f013073d 100644 --- a/aea/components/base.py +++ b/aea/components/base.py @@ -115,5 +115,4 @@ def logger(self) -> Union[Logger, LoggerAdapter]: @logger.setter def logger(self, logger: Union[Logger, LoggerAdapter]): """Set the logger.""" - assert self._logger is None, "Logger already set." self._logger = logger From 9f5672b164f8f612bdfc772d632c5b1c2b9992f3 Mon Sep 17 00:00:00 2001 From: MarcoFavorito Date: Tue, 14 Jul 2020 00:34:24 +0200 Subject: [PATCH 05/11] add loggers to multiplexer, task manager, registries and loop --- aea/aea.py | 18 ++++++ aea/agent_loop.py | 16 ++++-- aea/components/base.py | 18 +----- aea/helpers/logging.py | 29 +++++++++- aea/multiplexer.py | 111 ++++++++++++++++++++---------------- aea/registries/base.py | 23 +++++--- aea/registries/resources.py | 10 ++++ aea/skills/tasks.py | 17 +++--- 8 files changed, 156 insertions(+), 86 deletions(-) diff --git a/aea/aea.py b/aea/aea.py index 7027bb452b..2470087ad5 100644 --- a/aea/aea.py +++ b/aea/aea.py @@ -34,6 +34,7 @@ from aea.exceptions import AEAException from aea.helpers.exception_policy import ExceptionPolicyEnum from aea.helpers.exec_timeout import ExecTimeoutThreadGuard, TimeoutException +from aea.helpers.logging import AgentLoggerAdapter from aea.identity.base import Identity from aea.mail.base import Envelope from aea.protocols.base import Message @@ -135,6 +136,8 @@ def __init__( self._skills_exception_policy = skill_exception_policy + self._setup_loggers() + @property def decision_maker(self) -> DecisionMaker: """Get decision maker.""" @@ -383,3 +386,18 @@ def teardown(self) -> None: self.task_manager.stop() self.resources.teardown() ExecTimeoutThreadGuard.stop() + + def _setup_loggers(self): + """Setup logger with agent name. """ + for element in [ + self.main_loop, + self.multiplexer, + self.task_manager, + self.resources.component_registry, + self.resources.behaviour_registry, + self.resources.handler_registry, + self.resources.model_registry, + ]: + element.logger = AgentLoggerAdapter( + element.logger, agent_name=self._identity.name + ) diff --git a/aea/agent_loop.py b/aea/agent_loop.py index 5f6d176a2d..639fc0fea5 100644 --- a/aea/agent_loop.py +++ b/aea/agent_loop.py @@ -39,6 +39,7 @@ PeriodicCaller, ensure_loop, ) +from aea.helpers.logging import WithLogger from aea.multiplexer import InBox from aea.skills.base import Behaviour @@ -50,7 +51,7 @@ from aea.agent import Agent # pragma: no cover -class BaseAgentLoop(ABC): +class BaseAgentLoop(WithLogger, ABC): """Base abstract agent loop class.""" def __init__( @@ -61,6 +62,7 @@ def __init__( :params agent: Agent or AEA to run. :params loop: optional asyncio event loop. if not specified a new loop will be created. """ + WithLogger.__init__(self, logger) self._agent: "Agent" = agent self.set_loop(ensure_loop(loop)) self._tasks: List[asyncio.Task] = [] @@ -77,7 +79,7 @@ def start(self) -> None: async def run_loop(self) -> None: """Run agent loop.""" - logger.debug("agent loop started") + self.logger.debug("agent loop started") self._state.set(AgentLoopStates.started) self._set_tasks() try: @@ -171,7 +173,9 @@ def _behaviour_exception_callback(self, fn: Callable, exc: Exception) -> None: :return: None """ - logger.exception(f"Loop: Exception: `{exc}` occured during `{fn}` processing") + self.logger.exception( + f"Loop: Exception: `{exc}` occured during `{fn}` processing" + ) self._exceptions.append(exc) self._state.set(AgentLoopStates.error) @@ -200,7 +204,7 @@ def _register_behaviour(self, behaviour: Behaviour) -> None: ) self._behaviours_registry[behaviour] = periodic_caller periodic_caller.start() - logger.debug(f"Behaviour {behaviour} registered.") + self.logger.debug(f"Behaviour {behaviour} registered.") def _register_all_behaviours(self) -> None: """Register all AEA behaviours to run periodically.""" @@ -237,7 +241,7 @@ def _stop_tasks(self): def _set_tasks(self): """Set run loop tasks.""" self._tasks = self._create_tasks() - logger.debug("tasks created!") + self.logger.debug("tasks created!") def _create_tasks(self) -> List[Task]: """ @@ -256,7 +260,7 @@ def _create_tasks(self) -> List[Task]: async def _task_process_inbox(self) -> None: """Process incoming messages.""" inbox: InBox = self._agent.inbox - logger.info("[{}]: Start processing messages...".format(self._agent.name)) + self.logger.info("[{}]: Start processing messages...".format(self._agent.name)) while self.is_running: await inbox.async_wait() diff --git a/aea/components/base.py b/aea/components/base.py index a4f013073d..16a45bd7d8 100644 --- a/aea/components/base.py +++ b/aea/components/base.py @@ -31,11 +31,12 @@ ComponentType, PublicId, ) +from aea.helpers.logging import WithLogger logger = logging.getLogger(__name__) -class Component(ABC): +class Component(ABC, WithLogger): """Abstract class for an agent component.""" def __init__( @@ -49,6 +50,7 @@ def __init__( :param configuration: the package configuration. :param is_vendor: whether the package is vendorized. """ + WithLogger.__init__(self, logging.getLogger("aea")) self._configuration = configuration self._directory = None # type: Optional[Path] self._is_vendor = is_vendor @@ -102,17 +104,3 @@ def directory(self, path: Path) -> None: """Set the directory. Raise error if already set.""" assert self._directory is None, "Directory already set." self._directory = path - - @property - def logger(self) -> Union[Logger, LoggerAdapter]: - """Get the component logger.""" - if self._logger is None: - # if not set (e.g. programmatic instantiation) - # return a default one with "aea" as logger namespace. - return logging.getLogger("aea") - return self._logger - - @logger.setter - def logger(self, logger: Union[Logger, LoggerAdapter]): - """Set the logger.""" - self._logger = logger diff --git a/aea/helpers/logging.py b/aea/helpers/logging.py index c3a1bea6df..1a23901e91 100644 --- a/aea/helpers/logging.py +++ b/aea/helpers/logging.py @@ -17,8 +17,9 @@ # # ------------------------------------------------------------------------------ """Logging helpers.""" +import logging from logging import Logger, LoggerAdapter -from typing import Any, MutableMapping, Tuple +from typing import Any, MutableMapping, Optional, Tuple, Union class AgentLoggerAdapter(LoggerAdapter): @@ -37,3 +38,29 @@ def process( ) -> Tuple[Any, MutableMapping[str, Any]]: """Prepend the agent name to every log message.""" return "[%s] %s" % (self.extra["agent_name"], msg), kwargs + + +class WithLogger: + """Interface to endow subclasses with a logger.""" + + def __init__(self, logger: Optional[Union[Logger, LoggerAdapter]] = None): + """ + Initialize the logger. + + :param logger: the logger object. + """ + self._logger = logger + + @property + def logger(self) -> Union[Logger, LoggerAdapter]: + """Get the component logger.""" + if self._logger is None: + # if not set (e.g. programmatic instantiation) + # return a default one with "aea" as logger namespace. + return logging.getLogger("aea") + return self._logger + + @logger.setter + def logger(self, logger: Union[Logger, LoggerAdapter]): + """Set the logger.""" + self._logger = logger diff --git a/aea/multiplexer.py b/aea/multiplexer.py index b274de8b32..520a7f2f54 100644 --- a/aea/multiplexer.py +++ b/aea/multiplexer.py @@ -28,18 +28,19 @@ from aea.connections.base import Connection, ConnectionStatus from aea.helpers.async_friendly_queue import AsyncFriendlyQueue from aea.helpers.async_utils import ThreadedAsyncRunner, cancel_and_wait +from aea.helpers.logging import WithLogger from aea.mail.base import ( AEAConnectionError, Address, Empty, Envelope, EnvelopeContext, - logger, + logger as default_logger, ) from aea.protocols.base import Message -class AsyncMultiplexer: +class AsyncMultiplexer(WithLogger): """This class can handle multiple connections at once.""" def __init__( @@ -56,7 +57,9 @@ def __init__( This information is used for envelopes which don't specify any routing context. If connections is None, this parameter is ignored. :param loop: the event loop to run the multiplexer. If None, a new event loop is created. + :param agent_name: the name of the agent that owns the multiplexer, for logging purposes. """ + super().__init__(default_logger) self._connections: List[Connection] = [] self._id_to_connection: Dict[PublicId, Connection] = {} self._default_connection: Optional[Connection] = None @@ -107,7 +110,7 @@ def add_connection(self, connection: Connection, is_default: bool = False) -> No :return: None """ if connection.connection_id in self._id_to_connection: - logger.warning( + self.logger.warning( f"A connection with id {connection.connection_id} was already added. Replacing it..." ) @@ -174,13 +177,13 @@ def connection_status(self) -> ConnectionStatus: async def connect(self) -> None: """Connect the multiplexer.""" - logger.debug("Multiplexer connecting...") + self.logger.debug("Multiplexer connecting...") self._connection_consistency_checks() self._set_default_connection_if_none() self._out_queue = asyncio.Queue() async with self._lock: if self.connection_status.is_connected: - logger.debug("Multiplexer already connected.") + self.logger.debug("Multiplexer already connected.") return try: await self._connect_all() @@ -188,28 +191,28 @@ async def connect(self) -> None: self._connection_status.is_connected = True self._recv_loop_task = self._loop.create_task(self._receiving_loop()) self._send_loop_task = self._loop.create_task(self._send_loop()) - logger.debug("Multiplexer connected and running.") + self.logger.debug("Multiplexer connected and running.") except (CancelledError, Exception): - logger.exception("Exception on connect:") + self.logger.exception("Exception on connect:") self._connection_status.is_connected = False await self._stop() raise AEAConnectionError("Failed to connect the multiplexer.") async def disconnect(self) -> None: """Disconnect the multiplexer.""" - logger.debug("Multiplexer disconnecting...") + self.logger.debug("Multiplexer disconnecting...") async with self._lock: if not self.connection_status.is_connected: - logger.debug("Multiplexer already disconnected.") + self.logger.debug("Multiplexer already disconnected.") await asyncio.wait_for(self._stop(), timeout=60) return try: await asyncio.wait_for(self._disconnect_all(), timeout=60) await asyncio.wait_for(self._stop(), timeout=60) self._connection_status.is_connected = False - logger.debug("Multiplexer disconnected.") + self.logger.debug("Multiplexer disconnected.") except (CancelledError, Exception): - logger.exception("Exception on disconnect:") + self.logger.exception("Exception on disconnect:") raise AEAConnectionError("Failed to disconnect the multiplexer.") async def _stop(self) -> None: @@ -219,7 +222,7 @@ async def _stop(self) -> None: Stops recv and send loops. Disconnect every connection. """ - logger.debug("Stopping multiplexer...") + self.logger.debug("Stopping multiplexer...") await cancel_and_wait(self._recv_loop_task) self._recv_loop_task = None @@ -235,18 +238,18 @@ async def _stop(self) -> None: if c.connection_status.is_connected or c.connection_status.is_connecting ]: await connection.disconnect() - logger.debug("Multiplexer stopped.") + self.logger.debug("Multiplexer stopped.") async def _connect_all(self) -> None: """Set all the connection up.""" - logger.debug("Starting multiplexer connections.") + self.logger.debug("Starting multiplexer connections.") connected = [] # type: List[PublicId] for connection_id, connection in self._id_to_connection.items(): try: await self._connect_one(connection_id) connected.append(connection_id) except Exception as e: # pylint: disable=broad-except - logger.error( + self.logger.error( "Error while connecting {}: {}".format( str(type(connection)), str(e) ) @@ -254,7 +257,7 @@ async def _connect_all(self) -> None: for c in connected: await self._disconnect_one(c) break - logger.debug("Multiplexer connections are set.") + self.logger.debug("Multiplexer connections are set.") async def _connect_one(self, connection_id: PublicId) -> None: """ @@ -264,15 +267,15 @@ async def _connect_one(self, connection_id: PublicId) -> None: :return: None """ connection = self._id_to_connection[connection_id] - logger.debug("Processing connection {}".format(connection.connection_id)) + self.logger.debug("Processing connection {}".format(connection.connection_id)) if connection.connection_status.is_connected: - logger.debug( + self.logger.debug( "Connection {} already established.".format(connection.connection_id) ) else: connection.loop = self._loop await connection.connect() - logger.debug( + self.logger.debug( "Connection {} has been set up successfully.".format( connection.connection_id ) @@ -280,12 +283,12 @@ async def _connect_one(self, connection_id: PublicId) -> None: async def _disconnect_all(self) -> None: """Tear all the connections down.""" - logger.debug("Tear the multiplexer connections down.") + self.logger.debug("Tear the multiplexer connections down.") for connection_id, connection in self._id_to_connection.items(): try: await self._disconnect_one(connection_id) except Exception as e: # pylint: disable=broad-except - logger.error( + self.logger.error( "Error while disconnecting {}: {}".format( str(type(connection)), str(e) ) @@ -299,14 +302,14 @@ async def _disconnect_one(self, connection_id: PublicId) -> None: :return: None """ connection = self._id_to_connection[connection_id] - logger.debug("Processing connection {}".format(connection.connection_id)) + self.logger.debug("Processing connection {}".format(connection.connection_id)) if not connection.connection_status.is_connected: - logger.debug( + self.logger.debug( "Connection {} already disconnected.".format(connection.connection_id) ) else: await connection.disconnect() - logger.debug( + self.logger.debug( "Connection {} has been disconnected successfully.".format( connection.connection_id ) @@ -315,39 +318,41 @@ async def _disconnect_one(self, connection_id: PublicId) -> None: async def _send_loop(self) -> None: """Process the outgoing envelopes.""" if not self.is_connected: - logger.debug("Sending loop not started. The multiplexer is not connected.") + self.logger.debug( + "Sending loop not started. The multiplexer is not connected." + ) return while self.is_connected: try: - logger.debug("Waiting for outgoing envelopes...") + self.logger.debug("Waiting for outgoing envelopes...") envelope = await self.out_queue.get() if envelope is None: - logger.debug( + self.logger.debug( "Received empty envelope. Quitting the sending loop..." ) return None - logger.debug("Sending envelope {}".format(str(envelope))) + self.logger.debug("Sending envelope {}".format(str(envelope))) await self._send(envelope) except asyncio.CancelledError: - logger.debug("Sending loop cancelled.") + self.logger.debug("Sending loop cancelled.") return except AEAConnectionError as e: - logger.error(str(e)) + self.logger.error(str(e)) except Exception as e: # pylint: disable=broad-except - logger.error("Error in the sending loop: {}".format(str(e))) + self.logger.error("Error in the sending loop: {}".format(str(e))) return async def _receiving_loop(self) -> None: """Process incoming envelopes.""" - logger.debug("Starting receving loop...") + self.logger.debug("Starting receving loop...") task_to_connection = { asyncio.ensure_future(conn.receive()): conn for conn in self.connections } while self.connection_status.is_connected and len(task_to_connection) > 0: try: - # logger.debug("Waiting for incoming envelopes...") + # self.self.logger.debug("Waiting for incoming envelopes...") done, _pending = await asyncio.wait( task_to_connection.keys(), return_when=asyncio.FIRST_COMPLETED ) @@ -365,17 +370,17 @@ async def _receiving_loop(self) -> None: task_to_connection[new_task] = connection except asyncio.CancelledError: - logger.debug("Receiving loop cancelled.") + self.logger.debug("Receiving loop cancelled.") break except Exception as e: # pylint: disable=broad-except - logger.error("Error in the receiving loop: {}".format(str(e))) - logger.exception("Error in the receiving loop: {}".format(str(e))) + self.logger.error("Error in the receiving loop: {}".format(str(e))) + self.logger.exception("Error in the receiving loop: {}".format(str(e))) break # cancel all the receiving tasks. for t in task_to_connection.keys(): t.cancel() - logger.debug("Receiving loop terminated.") + self.logger.debug("Receiving loop terminated.") async def _send(self, envelope: Envelope) -> None: """ @@ -395,7 +400,7 @@ async def _send(self, envelope: Envelope) -> None: # second, try to route by default routing if connection_id is None and envelope.protocol_id in self.default_routing: connection_id = self.default_routing[envelope.protocol_id] - logger.debug("Using default routing: {}".format(connection_id)) + self.logger.debug("Using default routing: {}".format(connection_id)) if connection_id is not None and connection_id not in self._id_to_connection: raise AEAConnectionError( @@ -403,7 +408,9 @@ async def _send(self, envelope: Envelope) -> None: ) if connection_id is None: - logger.debug("Using default connection: {}".format(self.default_connection)) + self.logger.debug( + "Using default connection: {}".format(self.default_connection) + ) connection = self.default_connection else: connection = self._id_to_connection[connection_id] @@ -413,7 +420,7 @@ async def _send(self, envelope: Envelope) -> None: len(connection.restricted_to_protocols) > 0 and envelope.protocol_id not in connection.restricted_to_protocols ): - logger.warning( + self.logger.warning( "Connection {} cannot handle protocol {}. Cannot send the envelope.".format( connection.connection_id, envelope.protocol_id ) @@ -532,7 +539,7 @@ def disconnect(self) -> None: # type: ignore # cause overrides coroutine # pyli Also stops a dedicated thread for event loop if spawned on connect. """ - logger.debug("Disconnect called") + self.logger.debug("Disconnect called") with self._sync_lock: if not self._loop.is_running(): return @@ -540,12 +547,12 @@ def disconnect(self) -> None: # type: ignore # cause overrides coroutine # pyli if self._is_connected: self._thread_runner.call(super().disconnect()).result(240) self._is_connected = False - logger.debug("Disconnect async method executed") + self.logger.debug("Disconnect async method executed") if self._thread_runner.is_alive() and self._thread_was_started: self._thread_runner.stop() - logger.debug("Thread stopped") - logger.debug("Disconnected") + self.logger.debug("Thread stopped") + self.logger.debug("Disconnected") def put(self, envelope: Envelope) -> None: # type: ignore # cause overrides coroutine """ @@ -609,11 +616,11 @@ def get(self, block: bool = False, timeout: Optional[float] = None) -> Envelope: :return: the envelope object. :raises Empty: if the attempt to get an envelope fails. """ - logger.debug("Checks for envelope from the in queue...") + self._multiplexer.logger.debug("Checks for envelope from the in queue...") envelope = self._multiplexer.get(block=block, timeout=timeout) if envelope is None: raise Empty() - logger.debug( + self._multiplexer.logger.debug( "Incoming envelope: to='{}' sender='{}' protocol_id='{}' message='{!r}'".format( envelope.to, envelope.sender, envelope.protocol_id, envelope.message ) @@ -638,11 +645,13 @@ async def async_get(self) -> Envelope: :return: the envelope object. """ - logger.debug("Checks for envelope from the in queue async way...") + self._multiplexer.logger.debug( + "Checks for envelope from the in queue async way..." + ) envelope = await self._multiplexer.async_get() if envelope is None: raise Empty() - logger.debug( + self._multiplexer.logger.debug( "Incoming envelope: to='{}' sender='{}' protocol_id='{}' message='{!r}'".format( envelope.to, envelope.sender, envelope.protocol_id, envelope.message ) @@ -655,7 +664,9 @@ async def async_wait(self) -> None: :return: the envelope object. """ - logger.debug("Checks for envelope presents in queue async way...") + self._multiplexer.logger.debug( + "Checks for envelope presents in queue async way..." + ) await self._multiplexer.async_wait() @@ -688,7 +699,7 @@ def put(self, envelope: Envelope) -> None: :param envelope: the envelope. :return: None """ - logger.debug( + self._multiplexer.logger.debug( "Put an envelope in the queue: to='{}' sender='{}' protocol_id='{}' message='{!r}' context='{}'...".format( envelope.to, envelope.sender, diff --git a/aea/registries/base.py b/aea/registries/base.py index e8bdd950ec..9b84dc4a29 100644 --- a/aea/registries/base.py +++ b/aea/registries/base.py @@ -32,6 +32,7 @@ PublicId, SkillId, ) +from aea.helpers.logging import WithLogger from aea.skills.base import Behaviour, Handler, Model logger = logging.getLogger(__name__) @@ -41,9 +42,13 @@ SkillComponentType = TypeVar("SkillComponentType", Handler, Behaviour, Model) -class Registry(Generic[ItemId, Item], ABC): +class Registry(Generic[ItemId, Item], WithLogger, ABC): """This class implements an abstract registry.""" + def __init__(self): + """Initialize the registry.""" + super().__init__(logger) + @abstractmethod def register(self, item_id: ItemId, item: Item) -> None: """ @@ -108,6 +113,7 @@ def __init__(self) -> None: :return: None """ + super().__init__() self._components_by_type: Dict[ComponentType, Dict[PublicId, Component]] = {} self._registered_keys: Set[ComponentId] = set() @@ -157,7 +163,9 @@ def _unregister(self, component_id: ComponentId) -> None: ) self._registered_keys.discard(component_id) if item is not None: - logger.debug("Component '{}' has been removed.".format(item.component_id)) + self.logger.debug( + "Component '{}' has been removed.".format(item.component_id) + ) def unregister( self, component_id: ComponentId @@ -236,6 +244,7 @@ def __init__(self) -> None: :return: None """ + super().__init__() self._items = {} # type: Dict[SkillId, Dict[str, SkillComponentType]] def register(self, item_id: Tuple[SkillId, str], item: SkillComponentType) -> None: @@ -272,7 +281,7 @@ def unregister(self, item_id: Tuple[SkillId, str]) -> None: raise ValueError( "No item registered with component id '{}'".format(item_id) ) - logger.debug("Unregistering item with id {}".format(item_id)) + self.logger.debug("Unregistering item with id {}".format(item_id)) name_to_item.pop(item_name) if len(name_to_item) == 0: @@ -315,14 +324,14 @@ def setup(self) -> None: """ for item in self.fetch_all(): if item.context.is_active: - logger.debug( + self.logger.debug( "Calling setup() of component {} of skill {}".format( item.name, item.skill_id ) ) item.setup() else: - logger.debug( + self.logger.debug( "Ignoring setup() of component {} of skill {}, because the skill is not active.".format( item.name, item.skill_id ) @@ -339,7 +348,7 @@ def teardown(self) -> None: try: item.teardown() except Exception as e: # pragma: nocover # pylint: disable=broad-except - logger.warning( + self.logger.warning( "An error occurred while tearing down item {}/{}: {}".format( skill_id, type(item).__name__, str(e) ) @@ -411,7 +420,7 @@ def unregister(self, item_id: Tuple[SkillId, str]) -> None: raise ValueError( "No item registered with component id '{}'".format(item_id) ) - logger.debug("Unregistering item with id {}".format(item_id)) + self.logger.debug("Unregistering item with id {}".format(item_id)) handler = name_to_item.pop(item_name) if len(name_to_item) == 0: diff --git a/aea/registries/resources.py b/aea/registries/resources.py index 86fa887103..2c0d55c241 100644 --- a/aea/registries/resources.py +++ b/aea/registries/resources.py @@ -63,6 +63,11 @@ def __init__(self) -> None: self._model_registry, ] # type: List[Registry] + @property + def component_registry(self) -> AgentComponentRegistry: + """Get the agent component registry.""" + return self._component_registry + @property def behaviour_registry(self) -> ComponentRegistry[Behaviour]: """Get the behaviour registry.""" @@ -73,6 +78,11 @@ def handler_registry(self) -> HandlerRegistry: """Get the handler registry.""" return self._handler_registry + @property + def model_registry(self) -> ComponentRegistry[Model]: + """Get the model registry.""" + return self._model_registry + def add_component(self, component: Component): """Add a component to resources.""" if component.component_type == ComponentType.PROTOCOL: diff --git a/aea/skills/tasks.py b/aea/skills/tasks.py index 5f68adebcb..50a0e5a15b 100644 --- a/aea/skills/tasks.py +++ b/aea/skills/tasks.py @@ -25,6 +25,8 @@ from multiprocessing.pool import AsyncResult, Pool from typing import Any, Callable, Dict, Optional, Sequence, cast +from aea.helpers.logging import WithLogger + logger = logging.getLogger(__name__) @@ -120,7 +122,7 @@ def init_worker() -> None: # signal.signal(signal.CTRL_C_EVENT, signal.SIG_IGN) -class TaskManager: +class TaskManager(WithLogger): """A Task manager.""" def __init__(self, nb_workers: int = 1, is_lazy_pool_start: bool = True): @@ -130,6 +132,7 @@ def __init__(self, nb_workers: int = 1, is_lazy_pool_start: bool = True): :param nb_workers: the number of worker processes. :param is_lazy_pool_start: option to postpone pool creation till the first enqueue_task called. """ + WithLogger.__init__(self, logger) self._nb_workers = nb_workers self._is_lazy_pool_start = is_lazy_pool_start self._pool = None # type: Optional[Pool] @@ -207,9 +210,9 @@ def start(self) -> None: """ with self._lock: if self._stopped is False: - logger.debug("Task manager already running.") + self.logger.debug("Task manager already running.") else: - logger.debug("Start the task manager.") + self.logger.debug("Start the task manager.") self._stopped = False if not self._is_lazy_pool_start: self._start_pool() @@ -222,9 +225,9 @@ def stop(self) -> None: """ with self._lock: if self._stopped is True: - logger.debug("Task manager already stopped.") + self.logger.debug("Task manager already stopped.") else: - logger.debug("Stop the task manager.") + self.logger.debug("Stop the task manager.") self._stopped = True self._stop_pool() @@ -237,7 +240,7 @@ def _start_pool(self) -> None: :return: None """ if self._pool: - logger.debug("Pool was already started!.") + self.logger.debug("Pool was already started!.") return self._pool = Pool(self._nb_workers, initializer=init_worker) @@ -248,7 +251,7 @@ def _stop_pool(self) -> None: :return: None """ if not self._pool: - logger.debug("Pool is not started!.") + self.logger.debug("Pool is not started!.") return self._pool = cast(Pool, self._pool) From 8dbfc1759572ffde062b266880f9f7bdc2d7c740 Mon Sep 17 00:00:00 2001 From: MarcoFavorito Date: Tue, 14 Jul 2020 01:49:42 +0200 Subject: [PATCH 06/11] update connection to use self.logger --- .../fetchai/connections/gym/connection.py | 6 ++- .../fetchai/connections/gym/connection.yaml | 2 +- .../connections/http_client/connection.py | 18 ++++--- .../connections/http_client/connection.yaml | 2 +- .../connections/http_server/connection.py | 17 ++++--- .../connections/http_server/connection.yaml | 2 +- .../fetchai/connections/local/connection.py | 4 +- .../fetchai/connections/local/connection.yaml | 2 +- .../fetchai/connections/oef/connection.py | 43 ++++++++++------- .../fetchai/connections/oef/connection.yaml | 2 +- .../connections/p2p_client/connection.py | 16 ++++--- .../connections/p2p_client/connection.yaml | 2 +- .../connections/p2p_libp2p/connection.py | 45 +++++++++-------- .../connections/p2p_libp2p/connection.yaml | 2 +- .../p2p_libp2p_client/connection.py | 18 +++---- .../p2p_libp2p_client/connection.yaml | 2 +- .../fetchai/connections/soef/connection.py | 8 ++-- .../fetchai/connections/soef/connection.yaml | 2 +- packages/fetchai/connections/tcp/base.py | 14 +++--- .../fetchai/connections/tcp/connection.yaml | 6 +-- .../fetchai/connections/tcp/tcp_client.py | 16 ++++--- .../fetchai/connections/tcp/tcp_server.py | 16 +++---- .../fetchai/connections/webhook/connection.py | 4 +- .../connections/webhook/connection.yaml | 2 +- packages/hashes.csv | 22 ++++----- .../test_oef/test_communication.py | 48 ++++++++----------- .../test_connections/test_tcp/test_base.py | 37 +++++--------- .../test_tcp/test_communication.py | 37 +++++++------- 28 files changed, 200 insertions(+), 195 deletions(-) diff --git a/packages/fetchai/connections/gym/connection.py b/packages/fetchai/connections/gym/connection.py index 9ec55d4115..3b53544934 100644 --- a/packages/fetchai/connections/gym/connection.py +++ b/packages/fetchai/connections/gym/connection.py @@ -24,7 +24,7 @@ from asyncio import CancelledError from asyncio.events import AbstractEventLoop from concurrent.futures.thread import ThreadPoolExecutor -from typing import Optional, cast +from typing import Optional, Union, cast import gym @@ -57,6 +57,7 @@ def __init__(self, address: Address, gym_env: gym.Env): self._threaded_pool: ThreadPoolExecutor = ThreadPoolExecutor( self.THREAD_POOL_SIZE ) + self.logger: Union[logging.Logger, logging.LoggerAdapter] = logger @property def queue(self) -> asyncio.Queue: @@ -83,7 +84,7 @@ async def send(self, envelope: Envelope) -> None: :return: None """ sender = envelope.sender - logger.debug("Processing message from {}: {}".format(sender, envelope)) + self.logger.debug("Processing message from {}: {}".format(sender, envelope)) if envelope.protocol_id != GymMessage.protocol_id: raise ValueError("This protocol is not valid for gym.") await self.handle_gym_message(envelope) @@ -183,6 +184,7 @@ async def connect(self) -> None: """ if not self.connection_status.is_connected: self.connection_status.is_connected = True + self.channel.logger = self.logger await self.channel.connect() async def disconnect(self) -> None: diff --git a/packages/fetchai/connections/gym/connection.yaml b/packages/fetchai/connections/gym/connection.yaml index 10f02515ae..7515f475b5 100644 --- a/packages/fetchai/connections/gym/connection.yaml +++ b/packages/fetchai/connections/gym/connection.yaml @@ -6,7 +6,7 @@ license: Apache-2.0 aea_version: '>=0.5.0, <0.6.0' fingerprint: __init__.py: QmWwxj1hGGZNteCvRtZxwtY9PuEKsrWsEmMWCKwiYCdvRR - connection.py: QmV2REDadG36ogXD3eW4Ms82gUfWdjAQJcNJ6ik48P1CC4 + connection.py: QmZHUedJDmV2X1kXcjjyZHwWbwV3553QEKSUYcK6NTtr4F fingerprint_ignore_patterns: [] protocols: - fetchai/gym:0.3.0 diff --git a/packages/fetchai/connections/http_client/connection.py b/packages/fetchai/connections/http_client/connection.py index 7d18ea8b4d..a9e54435ec 100644 --- a/packages/fetchai/connections/http_client/connection.py +++ b/packages/fetchai/connections/http_client/connection.py @@ -85,9 +85,12 @@ def __init__( ) # type: Optional[asyncio.AbstractEventLoop] # pragma: no cover self.excluded_protocols = excluded_protocols self.is_stopped = True - logger.info("Initialised the HTTP client channel") self._tasks: Set[Task] = set() + self.logger = logger + # TODO logger at this point is the module-level one, not with the agent name. + self.logger.info("Initialised the HTTP client channel") + async def connect(self, loop: AbstractEventLoop) -> None: """ Connect channel using loop. @@ -160,7 +163,7 @@ async def _perform_http_request( await resp.read() return resp except Exception: # pragma: nocover # pylint: disable=broad-except - logger.exception( + self.logger.exception( f"Exception raised during http call: {request_http_message.method} {request_http_message.url}" ) raise @@ -186,7 +189,7 @@ def send(self, request_envelope: Envelope) -> None: return if request_envelope.protocol_id in (self.excluded_protocols or []): - logger.error( + self.logger.error( "This envelope cannot be sent with the http client connection: protocol_id={}".format( request_envelope.protocol_id ) @@ -202,7 +205,7 @@ def send(self, request_envelope: Envelope) -> None: if ( request_http_message.performative != HttpMessage.Performative.REQUEST ): # pragma: nocover - logger.warning( + self.logger.warning( "The HTTPMessage performative must be a REQUEST. Envelop dropped." ) return @@ -222,7 +225,7 @@ def _task_done_callback(self, task: Task) -> None: :return: None """ self._tasks.remove(task) - logger.debug(f"Task completed: {task}") + self.logger.debug(f"Task completed: {task}") async def get_message(self) -> Union["Envelope", None]: """ @@ -298,7 +301,7 @@ async def _cancel_tasks(self) -> None: async def disconnect(self) -> None: """Disconnect.""" if not self.is_stopped: - logger.info("HTTP Client has shutdown on port: {}.".format(self.port)) + self.logger.info("HTTP Client has shutdown on port: {}.".format(self.port)) self.is_stopped = True await self._cancel_tasks() @@ -331,6 +334,7 @@ async def connect(self) -> None: """ if not self.connection_status.is_connected: self.connection_status.is_connected = True + self.channel.logger = self.logger await self.channel.connect(self._loop) async def disconnect(self) -> None: @@ -369,5 +373,5 @@ async def receive(self, *args, **kwargs) -> Optional[Union["Envelope", None]]: try: return await self.channel.get_message() except Exception: # pragma: nocover # pylint: disable=broad-except - logger.exception("Exception on receive") + self.logger.exception("Exception on receive") return None diff --git a/packages/fetchai/connections/http_client/connection.yaml b/packages/fetchai/connections/http_client/connection.yaml index 6b882aca04..e05423427c 100644 --- a/packages/fetchai/connections/http_client/connection.yaml +++ b/packages/fetchai/connections/http_client/connection.yaml @@ -7,7 +7,7 @@ license: Apache-2.0 aea_version: '>=0.5.0, <0.6.0' fingerprint: __init__.py: QmPdKAks8A6XKAgZiopJzPZYXJumTeUqChd8UorqmLQQPU - connection.py: QmancYRcofdt3wSti4RymqTNWYbLtnbjxKYpB4z2LERrWd + connection.py: QmVBfBJRLfKHaU1afrcSumDrfPuNa7it6mxLq4pQitRtW9 fingerprint_ignore_patterns: [] protocols: - fetchai/http:0.3.0 diff --git a/packages/fetchai/connections/http_server/connection.py b/packages/fetchai/connections/http_server/connection.py index 5d4568bbc7..07180acbaf 100644 --- a/packages/fetchai/connections/http_server/connection.py +++ b/packages/fetchai/connections/http_server/connection.py @@ -348,6 +348,8 @@ def __init__( self.http_server: Optional[web.TCPSite] = None self.pending_requests: Dict[RequestId, Future] = {} + self.logger = logger + @property def api_spec(self) -> APISpec: """Get the api spec.""" @@ -368,11 +370,13 @@ async def connect(self, loop: AbstractEventLoop) -> None: try: await self._start_http_server() - logger.info("HTTP Server has connected to port: {}.".format(self.port)) + self.logger.info( + "HTTP Server has connected to port: {}.".format(self.port) + ) except Exception: # pragma: nocover # pylint: disable=broad-except self.is_stopped = True self._in_queue = None - logger.exception( + self.logger.exception( "Failed to start server on {}:{}.".format(self.host, self.port) ) @@ -390,7 +394,7 @@ async def _http_handler(self, http_request: BaseRequest) -> Response: is_valid_request = self.api_spec.verify(request) if not is_valid_request: - logger.warning(f"request is not valid: {request}") + self.logger.warning(f"request is not valid: {request}") return Response(status=NOT_FOUND, reason="Request Not Found") try: @@ -433,7 +437,7 @@ def send(self, envelope: Envelope) -> None: assert self.http_server is not None, "Server not connected, call connect first!" if envelope.protocol_id not in self.restricted_to_protocols: - logger.error( + self.logger.error( "This envelope cannot be sent with the http connection: protocol_id={}".format( envelope.protocol_id ) @@ -443,7 +447,7 @@ def send(self, envelope: Envelope) -> None: future = self.pending_requests.pop(envelope.to, None) if not future: - logger.warning( + self.logger.warning( "Dropping envelope for request id {} which has timed out.".format( envelope.to ) @@ -461,7 +465,7 @@ async def disconnect(self) -> None: if not self.is_stopped: await self.http_server.stop() - logger.info("HTTP Server has shutdown on port: {}.".format(self.port)) + self.logger.info("HTTP Server has shutdown on port: {}.".format(self.port)) self.is_stopped = True self._in_queue = None @@ -494,6 +498,7 @@ async def connect(self) -> None: :return: None """ if not self.connection_status.is_connected: + self.channel.logger = self.logger await self.channel.connect(loop=self.loop) self.connection_status.is_connected = not self.channel.is_stopped diff --git a/packages/fetchai/connections/http_server/connection.yaml b/packages/fetchai/connections/http_server/connection.yaml index 308c398eb5..3359e35da9 100644 --- a/packages/fetchai/connections/http_server/connection.yaml +++ b/packages/fetchai/connections/http_server/connection.yaml @@ -7,7 +7,7 @@ license: Apache-2.0 aea_version: '>=0.5.0, <0.6.0' fingerprint: __init__.py: Qmb6JEAkJeb5JweqrSGiGoQp1vGXqddjGgb9WMkm2phTgA - connection.py: Qmf1GFFhq4LQXLGizrp6nMDy4R7XRoqEayzqaEaxuToVnu + connection.py: QmcMok71MvxwP6gKNT1tcrZk6LuayzJPQ9JAkVstJjfv83 fingerprint_ignore_patterns: [] protocols: - fetchai/http:0.3.0 diff --git a/packages/fetchai/connections/local/connection.py b/packages/fetchai/connections/local/connection.py index 5d2a0a90e5..f963c44dbd 100644 --- a/packages/fetchai/connections/local/connection.py +++ b/packages/fetchai/connections/local/connection.py @@ -368,9 +368,9 @@ async def receive(self, *args, **kwargs) -> Optional["Envelope"]: assert self._reader is not None envelope = await self._reader.get() if envelope is None: - logger.debug("Receiving task terminated.") + self.logger.debug("Receiving task terminated.") return None - logger.debug("Received envelope {}".format(envelope)) + self.logger.debug("Received envelope {}".format(envelope)) return envelope except Exception: # pragma: nocover # pylint: disable=broad-except return None diff --git a/packages/fetchai/connections/local/connection.yaml b/packages/fetchai/connections/local/connection.yaml index 7b28ee2632..e65cff78de 100644 --- a/packages/fetchai/connections/local/connection.yaml +++ b/packages/fetchai/connections/local/connection.yaml @@ -6,7 +6,7 @@ license: Apache-2.0 aea_version: '>=0.5.0, <0.6.0' fingerprint: __init__.py: QmeeoX5E38Ecrb1rLdeFyyxReHLrcJoETnBcPbcNWVbiKG - connection.py: QmarTwASoQC365c6yCydYVB7524ELwJbXfHmh5qUPEEtec + connection.py: QmNdjfh6Qxx9vpV5oKLA9ceymU1mBciqFPUnUdZRQatBoU fingerprint_ignore_patterns: [] protocols: - fetchai/oef_search:0.3.0 diff --git a/packages/fetchai/connections/oef/connection.py b/packages/fetchai/connections/oef/connection.py index b113b2305d..5776a47dd0 100644 --- a/packages/fetchai/connections/oef/connection.py +++ b/packages/fetchai/connections/oef/connection.py @@ -139,6 +139,8 @@ def __init__( self._threaded_pool = ThreadPoolExecutor(self.THREAD_POOL_SIZE) + self.aea_logger = logger + async def _run_in_executor(self, fn, *args): return await self._loop.run_in_executor(self._threaded_pool, fn, *args) @@ -193,7 +195,7 @@ def on_cfp( :return: None """ self._check_loop_and_queue() - logger.warning( + self.aea_logger.warning( "Dropping incompatible on_cfp: msg_id={}, dialogue_id={}, origin={}, target={}, query={}".format( msg_id, dialogue_id, origin, target, query ) @@ -218,7 +220,7 @@ def on_propose( :return: None """ self._check_loop_and_queue() - logger.warning( + self.aea_logger.warning( "Dropping incompatible on_propose: msg_id={}, dialogue_id={}, origin={}, target={}".format( msg_id, dialogue_id, origin, target ) @@ -237,7 +239,7 @@ def on_accept( :return: None """ self._check_loop_and_queue() - logger.warning( + self.aea_logger.warning( "Dropping incompatible on_accept: msg_id={}, dialogue_id={}, origin={}, target={}".format( msg_id, dialogue_id, origin, target ) @@ -256,7 +258,7 @@ def on_decline( :return: None """ self._check_loop_and_queue() - logger.warning( + self.aea_logger.warning( "Dropping incompatible on_decline: msg_id={}, dialogue_id={}, origin={}, target={}".format( msg_id, dialogue_id, origin, target ) @@ -273,13 +275,13 @@ def on_search_result(self, search_id: int, agents: List[Address]) -> None: self._check_loop_and_queue() oef_search_dialogue = self.oef_msg_id_to_dialogue.pop(search_id, None) if oef_search_dialogue is None: - logger.warning( + self.aea_logger.warning( "Could not find dialogue for search_id={}".format(search_id) ) # pragma: nocover return # pragma: nocover last_msg = oef_search_dialogue.last_incoming_message if last_msg is None: - logger.warning("Could not find last message.") # pragma: nocover + self.aea_logger.warning("Could not find last message.") # pragma: nocover return # pragma: nocover msg = OefSearchMessage( performative=OefSearchMessage.Performative.SEARCH_RESULT, @@ -315,13 +317,13 @@ def on_oef_error( operation = OefSearchMessage.OefErrorOperation.OTHER oef_search_dialogue = self.oef_msg_id_to_dialogue.pop(answer_id, None) if oef_search_dialogue is None: - logger.warning( + self.aea_logger.warning( "Could not find dialogue for answer_id={}".format(answer_id) ) # pragma: nocover return # pragma: nocover last_msg = oef_search_dialogue.last_incoming_message if last_msg is None: - logger.warning("Could not find last message.") # pragma: nocover + self.aea_logger.warning("Could not find last message.") # pragma: nocover return # pragma: nocover msg = OefSearchMessage( performative=OefSearchMessage.Performative.OEF_ERROR, @@ -378,7 +380,7 @@ def send(self, envelope: Envelope) -> None: """ if self.excluded_protocols is not None: # pragma: nocover if envelope.protocol_id in self.excluded_protocols: - logger.error( + self.aea_logger.error( "This envelope cannot be sent with the oef connection: protocol_id={}".format( envelope.protocol_id ) @@ -412,7 +414,7 @@ def send_oef_message(self, envelope: Envelope) -> None: OefSearchDialogue, self.oef_search_dialogues.update(oef_message) ) if oef_search_dialogue is None: - logger.warning( + self.aea_logger.warning( "Could not create dialogue for message={}".format(oef_message) ) # pragma: nocover return # pragma: nocover @@ -443,7 +445,7 @@ def handle_failure( # pylint: disable=no-self-use self, exception: Exception, conn ) -> None: """Handle failure.""" - logger.exception(exception) # pragma: nocover + self.aea_logger.exception(exception) # pragma: nocover async def _set_loop_and_queue(self): self._loop = asyncio.get_event_loop() @@ -474,7 +476,9 @@ async def connect(self) -> None: ) if is_connected: return - logger.warning("Cannot connect to OEFChannel. Retrying in 5 seconds...") + self.aea_logger.warning( + "Cannot connect to OEFChannel. Retrying in 5 seconds..." + ) await asyncio.sleep(self.CONNECT_RETRY_DELAY) raise ValueError("Connect attempts limit!") # pragma: nocover @@ -538,6 +542,7 @@ async def connect(self) -> None: if self.connection_status.is_connected: return try: + self.channel.aea_logger = self.logger self.connection_status.is_connecting = True self._loop = asyncio.get_event_loop() await self.channel.connect() @@ -563,12 +568,14 @@ async def _connection_check(self) -> None: if not self.channel.get_state() == "connected": # pragma: no cover self.connection_status.is_connected = False self.connection_status.is_connecting = True - logger.warning( + self.logger.warning( "Lost connection to OEFChannel. Retrying to connect soon ..." ) await self.channel.connect() self.connection_status.is_connected = True - logger.warning("Successfully re-established connection to OEFChannel.") + self.logger.warning( + "Successfully re-established connection to OEFChannel." + ) async def disconnect(self) -> None: """ @@ -595,15 +602,15 @@ async def receive(self, *args, **kwargs) -> Optional["Envelope"]: try: envelope = await self.channel.get() if envelope is None: - logger.debug("Received None.") + self.logger.debug("Received None.") return None - logger.debug("Received envelope: {}".format(envelope)) + self.logger.debug("Received envelope: {}".format(envelope)) return envelope except CancelledError: - logger.debug("Receive cancelled.") + self.logger.debug("Receive cancelled.") return None except Exception as e: # pragma: nocover # pylint: disable=broad-except - logger.exception(e) + self.logger.exception(e) return None async def send(self, envelope: "Envelope") -> None: diff --git a/packages/fetchai/connections/oef/connection.yaml b/packages/fetchai/connections/oef/connection.yaml index 5570305232..97c21279f3 100644 --- a/packages/fetchai/connections/oef/connection.yaml +++ b/packages/fetchai/connections/oef/connection.yaml @@ -7,7 +7,7 @@ license: Apache-2.0 aea_version: '>=0.5.0, <0.6.0' fingerprint: __init__.py: QmUAen8tmoBHuCerjA3FSGKJRLG6JYyUS3chuWzPxKYzez - connection.py: QmPZDreV7zYeTZfgz8iBPUWif5jJaGjjvyYaoEbVkhau2q + connection.py: QmbQmmanASNwhJL5RmRmBgyQRuUtHr9XLs2d8NPN8m4Duh object_translator.py: QmNYd7ikc3nYZMCXjyfen2nENHpNCZws44MNEDbzAsHrGu fingerprint_ignore_patterns: [] protocols: diff --git a/packages/fetchai/connections/p2p_client/connection.py b/packages/fetchai/connections/p2p_client/connection.py index fd9e53c2bc..65aa756b6b 100644 --- a/packages/fetchai/connections/p2p_client/connection.py +++ b/packages/fetchai/connections/p2p_client/connection.py @@ -63,7 +63,8 @@ def __init__( self.thread = Thread(target=self.receiving_loop) self.lock = threading.Lock() self.stopped = True - logger.info("Initialised the peer to peer channel") + self.logger = logger + self.logger.info("Initialised the peer to peer channel") def connect(self): """ @@ -78,18 +79,18 @@ def connect(self): ) self.stopped = False self.thread.start() - logger.debug("P2P Channel is connected.") + self.logger.debug("P2P Channel is connected.") self.try_register() def try_register(self) -> bool: """Try to register to the provider.""" try: assert self._httpCall is not None - logger.info(self.address) + self.logger.info(self.address) query = self._httpCall.register(sender_address=self.address, mailbox=True) return query["status"] == "OK" except Exception: # pragma: no cover - logger.warning("Could not register to the provider.") + self.logger.warning("Could not register to the provider.") raise AEAConnectionError() def send(self, envelope: Envelope) -> None: @@ -103,7 +104,7 @@ def send(self, envelope: Envelope) -> None: if self.excluded_protocols is not None: if envelope.protocol_id in self.excluded_protocols: # pragma: nocover - logger.error( + self.logger.error( "This envelope cannot be sent with the oef connection: protocol_id={}".format( envelope.protocol_id ) @@ -128,7 +129,7 @@ def receiving_loop(self) -> None: sender_address=self.address ) # type: List[Dict[str, Any]] for message in messages: - logger.debug("Received message: {}".format(message)) + self.logger.debug("Received message: {}".format(message)) envelope = Envelope( to=message["TO"]["RECEIVER_ADDRESS"], sender=message["FROM"]["SENDER_ADDRESS"], @@ -137,7 +138,7 @@ def receiving_loop(self) -> None: ) self.loop.call_soon_threadsafe(self.in_queue.put_nowait, envelope) time.sleep(0.5) - logger.debug("Receiving loop stopped.") + self.logger.debug("Receiving loop stopped.") def disconnect(self) -> None: """ @@ -174,6 +175,7 @@ async def connect(self) -> None: :return: None """ if not self.connection_status.is_connected: + self.channel.logger = self.logger self.connection_status.is_connected = True self.channel.in_queue = asyncio.Queue() self.channel.loop = self.loop diff --git a/packages/fetchai/connections/p2p_client/connection.yaml b/packages/fetchai/connections/p2p_client/connection.yaml index 57cb12100c..2b47fa9e2b 100644 --- a/packages/fetchai/connections/p2p_client/connection.yaml +++ b/packages/fetchai/connections/p2p_client/connection.yaml @@ -7,7 +7,7 @@ license: Apache-2.0 aea_version: '>=0.5.0, <0.6.0' fingerprint: __init__.py: QmdwnPo8iC2uqf9CmB4ocbh6HP2jcgCtuFdS4djuajp6Li - connection.py: QmUbUbv9xVM9r9GaND4KNgFPzQwnujEUcTEZWvsAiTvzGY + connection.py: Qme6aWroPfVPJh1E8VizbYmF5mwLFdq4nY7bmmLtjSE1kk fingerprint_ignore_patterns: [] protocols: [] class_name: PeerToPeerConnection diff --git a/packages/fetchai/connections/p2p_libp2p/connection.py b/packages/fetchai/connections/p2p_libp2p/connection.py index c16bf5a3d6..c4fd3cfb0e 100644 --- a/packages/fetchai/connections/p2p_libp2p/connection.py +++ b/packages/fetchai/connections/p2p_libp2p/connection.py @@ -255,6 +255,8 @@ def __init__( self._reader_protocol = None # type: Optional[asyncio.StreamReaderProtocol] self._fileobj = None # type: Optional[IO[str]] + self.logger = logger + @property def reader_protocol(self) -> asyncio.StreamReaderProtocol: """Get reader protocol.""" @@ -279,11 +281,11 @@ async def start(self) -> None: # build the node # TOFIX(LR) fix async version - logger.info("Downloading golang dependencies. This may take a while...") + self.logger.info("Downloading golang dependencies. This may take a while...") proc = _golang_module_build(self.source, self._log_file_desc) proc.wait() with open(self.log_file, "r") as f: - logger.debug(f.read()) + self.logger.debug(f.read()) node_log = "" with open(self.log_file, "r") as f: node_log = f.read() @@ -293,12 +295,12 @@ async def start(self) -> None: proc.returncode, node_log ) ) - logger.info("Finished downloading golang dependencies.") + self.logger.info("Finished downloading golang dependencies.") # setup fifos in_path = self.libp2p_to_aea_path out_path = self.aea_to_libp2p_path - logger.debug("Creating pipes ({}, {})...".format(in_path, out_path)) + self.logger.debug("Creating pipes ({}, {})...".format(in_path, out_path)) if os.path.exists(in_path): os.remove(in_path) if os.path.exists(out_path): @@ -339,12 +341,12 @@ async def start(self) -> None: ) # run node - logger.info("Starting libp2p node...") + self.logger.info("Starting libp2p node...") self.proc = _golang_module_run( self.source, LIBP2P_NODE_MODULE_NAME, [self.env_file], self._log_file_desc ) - logger.info("Connecting to libp2p node...") + self.logger.info("Connecting to libp2p node...") await self._connect() async def _connect(self) -> None: @@ -355,13 +357,13 @@ async def _connect(self) -> None: """ if self._connection_attempts == 1: with open(self.log_file, "r") as f: - logger.debug("Couldn't connect to libp2p p2p process, logs:") - logger.debug(f.read()) + self.logger.debug("Couldn't connect to libp2p p2p process, logs:") + self.logger.debug(f.read()) raise Exception("Couldn't connect to libp2p p2p process") # TOFIX(LR) use proper exception self._connection_attempts -= 1 - logger.debug( + self.logger.debug( "Attempt opening pipes {}, {}...".format( self.libp2p_to_aea_path, self.aea_to_libp2p_path ) @@ -396,9 +398,9 @@ async def _connect(self) -> None: self._fileobj = os.fdopen(self._libp2p_to_aea, "r") await self._loop.connect_read_pipe(lambda: self.reader_protocol, self._fileobj) - logger.info("Successfully connected to libp2p node!") + self.logger.info("Successfully connected to libp2p node!") self.multiaddrs = self.get_libp2p_node_multiaddrs() - logger.info("My libp2p addresses: {}".format(self.multiaddrs)) + self.logger.info("My libp2p addresses: {}".format(self.multiaddrs)) @asyncio.coroutine def write(self, data: bytes) -> None: @@ -422,7 +424,7 @@ async def read(self) -> Optional[bytes]: self._stream_reader is not None ), "StreamReader not set, call connect first!" try: - logger.debug("Waiting for messages...") + self.logger.debug("Waiting for messages...") buf = await self._stream_reader.readexactly(4) if not buf: return None @@ -432,7 +434,7 @@ async def read(self) -> Optional[bytes]: return None return data except asyncio.streams.IncompleteReadError as e: - logger.info( + self.logger.info( "Connection disconnected while reading from node ({}/{})".format( len(e.partial), e.expected ) @@ -477,14 +479,14 @@ def stop(self) -> None: """ # TOFIX(LR) wait is blocking and proc can ignore terminate if self.proc is not None: - logger.debug("Terminating node process {}...".format(self.proc.pid)) + self.logger.debug("Terminating node process {}...".format(self.proc.pid)) self.proc.terminate() - logger.debug( + self.logger.debug( "Waiting for node process {} to terminate...".format(self.proc.pid) ) self.proc.wait() else: - logger.debug("Called stop when process not set!") + self.logger.debug("Called stop when process not set!") if os.path.exists(LIBP2P_NODE_ENV_FILE): os.remove(LIBP2P_NODE_ENV_FILE) @@ -605,6 +607,7 @@ async def connect(self) -> None: try: # start libp2p node self.connection_status.is_connecting = True + self.node.logger = self.logger await self.node.start() self.connection_status.is_connecting = False self.connection_status.is_connected = True @@ -638,7 +641,7 @@ async def disconnect(self) -> None: if self._in_queue is not None: self._in_queue.put_nowait(None) else: - logger.debug("Called disconnect when input queue not initialized.") + self.logger.debug("Called disconnect when input queue not initialized.") async def receive(self, *args, **kwargs) -> Optional["Envelope"]: """ @@ -650,18 +653,18 @@ async def receive(self, *args, **kwargs) -> Optional["Envelope"]: assert self._in_queue is not None, "Input queue not initialized." data = await self._in_queue.get() if data is None: - logger.debug("Received None.") + self.logger.debug("Received None.") self.node.stop() self.connection_status.is_connected = False return None # TOFIX(LR) attempt restarting the node? - logger.debug("Received data: {}".format(data)) + self.logger.debug("Received data: {}".format(data)) return Envelope.decode(data) except CancelledError: - logger.debug("Receive cancelled.") + self.logger.debug("Receive cancelled.") return None except Exception as e: # pragma: nocover # pylint: disable=broad-except - logger.exception(e) + self.logger.exception(e) return None async def send(self, envelope: Envelope): diff --git a/packages/fetchai/connections/p2p_libp2p/connection.yaml b/packages/fetchai/connections/p2p_libp2p/connection.yaml index dcecbf2625..584b604b79 100644 --- a/packages/fetchai/connections/p2p_libp2p/connection.yaml +++ b/packages/fetchai/connections/p2p_libp2p/connection.yaml @@ -11,7 +11,7 @@ fingerprint: aea/api.go: QmW5fUpVZmV3pxgoakm3RvsvCGC6FwT2XprcqXHM8rBXP5 aea/envelope.pb.go: QmRfUNGpCeVJfsW3H1MzCN4pwDWgumfyWufVFp6xvUjjug aea/envelope.proto: QmSC8EGCKiNFR2vf5bSWymSzYDFMipQW9aQVMwPzQoKb4n - connection.py: QmSxxjTeuWQoZtZrjN6bo8tmT61yxSBrRrJo9FLCgbUvKt + connection.py: QmWF3VWbmqdCSv1igyHmmZnsgRWKfjnet78JSYCeDaYzsA dht/dhtclient/dhtclient.go: Qma8rpw5wLUsqX1Qvengb1Da3KFB12ML1rZ8NGM5ZGZMar dht/dhtclient/dhtclient_test.go: QmdpspLKA9HXc56HVMcP36ikBpHrztWHJ6wWqoU6UnR6BM dht/dhtclient/options.go: QmPorj38wNrxGrzsbFe5wwLmiHzxbTJ2VsgvSd8tLDYS8s diff --git a/packages/fetchai/connections/p2p_libp2p_client/connection.py b/packages/fetchai/connections/p2p_libp2p_client/connection.py index 1e612cbcf5..ab8c3a4964 100644 --- a/packages/fetchai/connections/p2p_libp2p_client/connection.py +++ b/packages/fetchai/connections/p2p_libp2p_client/connection.py @@ -165,7 +165,7 @@ async def connect(self) -> None: self.connection_status.is_connecting = False self.connection_status.is_connected = True - logger.info( + self.logger.info( "Successfully connected to libp2p node {}".format(str(self.node_uri)) ) @@ -202,7 +202,7 @@ async def disconnect(self) -> None: # TOFIX(LR) mypy issue https://github.com/python/mypy/issues/8546 # self._process_messages_task = None - logger.debug("disconnecting libp2p client connection...") + self.logger.debug("disconnecting libp2p client connection...") self._writer.write_eof() await self._writer.drain() self._writer.close() @@ -212,7 +212,7 @@ async def disconnect(self) -> None: if self._in_queue is not None: self._in_queue.put_nowait(None) else: - logger.debug("Called disconnect when input queue not initialized.") + self.logger.debug("Called disconnect when input queue not initialized.") async def receive(self, *args, **kwargs) -> Optional["Envelope"]: """ @@ -224,7 +224,7 @@ async def receive(self, *args, **kwargs) -> Optional["Envelope"]: assert self._in_queue is not None, "Input queue not initialized." data = await self._in_queue.get() if data is None: - logger.debug("Received None.") + self.logger.debug("Received None.") if ( self._connection_status.is_connected or self._connection_status.is_connecting @@ -232,13 +232,13 @@ async def receive(self, *args, **kwargs) -> Optional["Envelope"]: await self.disconnect() return None # TOFIX(LR) attempt restarting the node? - logger.debug("Received data: {}".format(data)) + self.logger.debug("Received data: {}".format(data)) return Envelope.decode(data) except CancelledError: - logger.debug("Receive cancelled.") + self.logger.debug("Receive cancelled.") return None except Exception as e: # pragma: nocover # pylint: disable=broad-except - logger.exception(e) + self.logger.exception(e) return None async def send(self, envelope: Envelope): @@ -272,7 +272,7 @@ async def _send(self, data: bytes) -> None: async def _receive(self) -> Optional[bytes]: assert self._reader is not None try: - logger.debug("Waiting for messages...") + self.logger.debug("Waiting for messages...") buf = await self._reader.readexactly(4) if not buf: return None @@ -282,7 +282,7 @@ async def _receive(self) -> Optional[bytes]: return None return data except asyncio.streams.IncompleteReadError as e: - logger.info( + self.logger.info( "Connection disconnected while reading from node ({}/{})".format( len(e.partial), e.expected ) diff --git a/packages/fetchai/connections/p2p_libp2p_client/connection.yaml b/packages/fetchai/connections/p2p_libp2p_client/connection.yaml index 3b4ea52a50..d5c7efded4 100644 --- a/packages/fetchai/connections/p2p_libp2p_client/connection.yaml +++ b/packages/fetchai/connections/p2p_libp2p_client/connection.yaml @@ -8,7 +8,7 @@ license: Apache-2.0 aea_version: '>=0.5.0, <0.6.0' fingerprint: __init__.py: QmT1FEHkPGMHV5oiVEfQHHr25N2qdZxydSNRJabJvYiTgf - connection.py: QmT9ncNDy27GXAqtmJJDFQep2M8Qn7ycih7E8tMT2PwS3i + connection.py: QmP2fEZb7H9grCfPZVAf5WyY7Km9uoPFM84WQvnsBj3XQ8 fingerprint_ignore_patterns: [] protocols: [] class_name: P2PLibp2pClientConnection diff --git a/packages/fetchai/connections/soef/connection.py b/packages/fetchai/connections/soef/connection.py index 4385d7faaf..ce42a348aa 100644 --- a/packages/fetchai/connections/soef/connection.py +++ b/packages/fetchai/connections/soef/connection.py @@ -792,15 +792,15 @@ async def receive(self, *args, **kwargs) -> Optional["Envelope"]: assert self.in_queue is not None envelope = await self.in_queue.get() if envelope is None: # pragma: nocover - logger.debug("Received None.") + self.logger.debug("Received None.") return None - logger.debug("Received envelope: {}".format(envelope)) + self.logger.debug("Received envelope: {}".format(envelope)) return envelope except CancelledError: - logger.debug("Receive cancelled.") + self.logger.debug("Receive cancelled.") return None except Exception as e: # pragma: nocover # pylint: disable=broad-except - logger.exception(e) + self.logger.exception(e) return None async def send(self, envelope: "Envelope") -> None: diff --git a/packages/fetchai/connections/soef/connection.yaml b/packages/fetchai/connections/soef/connection.yaml index 66d148eada..4437276e1e 100644 --- a/packages/fetchai/connections/soef/connection.yaml +++ b/packages/fetchai/connections/soef/connection.yaml @@ -6,7 +6,7 @@ license: Apache-2.0 aea_version: '>=0.5.0, <0.6.0' fingerprint: __init__.py: Qmd5VBGFJHXFe1H45XoUh5mMSYBwvLSViJuGFeMgbPdQts - connection.py: QmdMjNwXran9ZeCqHhigU1HQeP7iHiSdFxddnhhtSk5Q96 + connection.py: QmTijwR5fVHdPUveLHnauZ6WuZHTzMhNh6nGoaCsbmWi8u fingerprint_ignore_patterns: [] protocols: - fetchai/oef_search:0.3.0 diff --git a/packages/fetchai/connections/tcp/base.py b/packages/fetchai/connections/tcp/base.py index 7a240b8545..e518e2eb89 100644 --- a/packages/fetchai/connections/tcp/base.py +++ b/packages/fetchai/connections/tcp/base.py @@ -76,14 +76,14 @@ async def connect(self): :raises ConnectionError: if a problem occurred during the connection. """ if self.connection_status.is_connected: - logger.warning("Connection already set up.") + self.logger.warning("Connection already set up.") return try: await self.setup() self.connection_status.is_connected = True except Exception as e: # pragma: nocover # pylint: disable=broad-except - logger.error(str(e)) + self.logger.error(str(e)) self.connection_status.is_connected = False async def disconnect(self) -> None: @@ -93,7 +93,7 @@ async def disconnect(self) -> None: :return: None. """ if not self.connection_status.is_connected: - logger.warning("Connection already disconnected.") + self.logger.warning("Connection already disconnected.") return await self.teardown() @@ -113,9 +113,9 @@ async def _recv(self, reader: StreamReader) -> Optional[bytes]: return data async def _send(self, writer, data): - logger.debug("[{}] Send a message".format(self.address)) + self.logger.debug("[{}] Send a message".format(self.address)) nbytes = struct.pack("I", len(data)) - logger.debug("#bytes: {!r}".format(nbytes)) + self.logger.debug("#bytes: {!r}".format(nbytes)) try: writer.write(nbytes) writer.write(data) @@ -135,4 +135,6 @@ async def send(self, envelope: Envelope) -> None: data = envelope.encode() await self._send(writer, data) else: - logger.error("[{}]: Cannot send envelope {}".format(self.address, envelope)) + self.logger.error( + "[{}]: Cannot send envelope {}".format(self.address, envelope) + ) diff --git a/packages/fetchai/connections/tcp/connection.yaml b/packages/fetchai/connections/tcp/connection.yaml index 18349bd1b2..a6d0ef06f7 100644 --- a/packages/fetchai/connections/tcp/connection.yaml +++ b/packages/fetchai/connections/tcp/connection.yaml @@ -6,10 +6,10 @@ license: Apache-2.0 aea_version: '>=0.5.0, <0.6.0' fingerprint: __init__.py: QmTxAtQ9ffraStxxLAkvmWxyGhoV3jE16Sw6SJ9xzTthLb - base.py: QmQhr6wYYc79LvdBWwKUqTwn1Qwr8KyQEWTz9uZxzuBGpE + base.py: Qme1DePz5749LJ9wuwwfNE9PUxrzFgZxqRAwUN5qMZ9Eox connection.py: QmP5Hei7U1iqcHqFDLzS1sKu6jcsBKvEi3udQussrePN3X - tcp_client.py: QmeBe8E9zofdzochVRJLg6m5CmNprnCSTmW3NqUYp49pEL - tcp_server.py: QmY7TRJnBiut6BJqpgYuwQvjHRG3xLePjxKDw7ffzr16Vc + tcp_client.py: QmTXs6z3rvxB59FmGuu46CeY1eHRPBNQ4CPZm1y7hRpusp + tcp_server.py: QmPP89GpcQpc8Ptfdgf74tgTuqWmLDRXQ6vA8rwWDtiiwh fingerprint_ignore_patterns: [] protocols: [] class_name: TCPClientConnection diff --git a/packages/fetchai/connections/tcp/tcp_client.py b/packages/fetchai/connections/tcp/tcp_client.py index 4ce343f1b8..906c50bb0c 100644 --- a/packages/fetchai/connections/tcp/tcp_client.py +++ b/packages/fetchai/connections/tcp/tcp_client.py @@ -34,7 +34,7 @@ from packages.fetchai.connections.tcp.base import TCPConnection -logger = logging.getLogger("aea.packages.fetchai.connections.tcp_client") +logger = logging.getLogger("aea.packages.fetchai.connections.tcp.tcp_client") STUB_DIALOGUE_ID = 0 @@ -86,20 +86,22 @@ async def receive(self, *args, **kwargs) -> Optional["Envelope"]: assert self._reader is not None data = await self._recv(self._reader) if data is None: # pragma: nocover - logger.debug("[{}] No data received.".format(self.address)) + self.logger.debug("[{}] No data received.".format(self.address)) return None - logger.debug("[{}] Message received: {!r}".format(self.address, data)) + self.logger.debug("[{}] Message received: {!r}".format(self.address, data)) envelope = Envelope.decode(data) # TODO handle decoding error - logger.debug("[{}] Decoded envelope: {}".format(self.address, envelope)) + self.logger.debug( + "[{}] Decoded envelope: {}".format(self.address, envelope) + ) return envelope except CancelledError: - logger.debug("[{}] Read cancelled.".format(self.address)) + self.logger.debug("[{}] Read cancelled.".format(self.address)) return None except struct.error as e: - logger.debug("Struct error: {}".format(str(e))) + self.logger.debug("Struct error: {}".format(str(e))) return None except Exception as e: - logger.exception(e) + self.logger.exception(e) raise def select_writer_from_envelope(self, envelope: Envelope) -> Optional[StreamWriter]: diff --git a/packages/fetchai/connections/tcp/tcp_server.py b/packages/fetchai/connections/tcp/tcp_server.py index 598380fdb6..f8ba05a575 100644 --- a/packages/fetchai/connections/tcp/tcp_server.py +++ b/packages/fetchai/connections/tcp/tcp_server.py @@ -60,12 +60,12 @@ async def handle(self, reader: StreamReader, writer: StreamWriter) -> None: :param writer: the stream writer. :return: None """ - logger.debug("Waiting for client address...") + self.logger.debug("Waiting for client address...") address_bytes = await self._recv(reader) if address_bytes is not None: address_bytes = cast(bytes, address_bytes) address = address_bytes.decode("utf-8") - logger.debug("Public key of the client: {}".format(address)) + self.logger.debug("Public key of the client: {}".format(address)) self.connections[address] = (reader, writer) read_task = asyncio.ensure_future(self._recv(reader), loop=self._loop) self._read_tasks_to_address[read_task] = address @@ -77,20 +77,20 @@ async def receive(self, *args, **kwargs) -> Optional["Envelope"]: :return: the received envelope, or None if an error occurred. """ if len(self._read_tasks_to_address) == 0: - logger.warning( + self.logger.warning( "Tried to read from the TCP server. However, there is no open connection to read from." ) return None try: - logger.debug("Waiting for incoming messages...") + self.logger.debug("Waiting for incoming messages...") done, _ = await asyncio.wait(self._read_tasks_to_address.keys(), return_when=asyncio.FIRST_COMPLETED) # type: ignore # take the first task = next(iter(done)) envelope_bytes = task.result() if envelope_bytes is None: # pragma: no cover - logger.debug("[{}]: No data received.") + self.logger.debug("[{}]: No data received.") return None envelope = Envelope.decode(envelope_bytes) address = self._read_tasks_to_address.pop(task) @@ -99,10 +99,10 @@ async def receive(self, *args, **kwargs) -> Optional["Envelope"]: self._read_tasks_to_address[new_task] = address return envelope except asyncio.CancelledError: - logger.debug("Receiving loop cancelled.") + self.logger.debug("Receiving loop cancelled.") return None except Exception as e: # pragma: nocover # pylint: disable=broad-except - logger.error("Error in the receiving loop: {}".format(str(e))) + self.logger.error("Error in the receiving loop: {}".format(str(e))) return None async def setup(self): @@ -110,7 +110,7 @@ async def setup(self): self._server = await asyncio.start_server( self.handle, host=self.host, port=self.port ) - logger.debug("Start listening on {}:{}".format(self.host, self.port)) + self.logger.debug("Start listening on {}:{}".format(self.host, self.port)) async def teardown(self): """Tear the connection down.""" diff --git a/packages/fetchai/connections/webhook/connection.py b/packages/fetchai/connections/webhook/connection.py index ca759ec54a..fd42179cf6 100644 --- a/packages/fetchai/connections/webhook/connection.py +++ b/packages/fetchai/connections/webhook/connection.py @@ -78,7 +78,8 @@ def __init__( self.connection_id = connection_id self.in_queue = None # type: Optional[asyncio.Queue] # pragma: no cover - logger.info("Initialised a webhook channel") + self.logger = logger + self.logger.info("Initialised a webhook channel") async def connect(self) -> None: """ @@ -211,6 +212,7 @@ async def connect(self) -> None: """ if not self.connection_status.is_connected: self.connection_status.is_connected = True + self.channel.logger = self.logger self.channel.in_queue = asyncio.Queue() await self.channel.connect() diff --git a/packages/fetchai/connections/webhook/connection.yaml b/packages/fetchai/connections/webhook/connection.yaml index fb696dccd9..8ec4a1b12b 100644 --- a/packages/fetchai/connections/webhook/connection.yaml +++ b/packages/fetchai/connections/webhook/connection.yaml @@ -6,7 +6,7 @@ license: Apache-2.0 aea_version: '>=0.5.0, <0.6.0' fingerprint: __init__.py: QmWUKSmXaBgGMvKgdmzKmMjCx43BnrfW6og2n3afNoAALq - connection.py: QmZuRpeuoa1sx5UTZtVsYh5RqnyreoinhTP2jXXVHzy3A6 + connection.py: QmP8HkkVmpez6pCbLpddzzuhMoxiDaoj3ptaAZEPv3WdAK fingerprint_ignore_patterns: [] protocols: - fetchai/http:0.3.0 diff --git a/packages/hashes.csv b/packages/hashes.csv index 91d641df00..8b834a6b48 100644 --- a/packages/hashes.csv +++ b/packages/hashes.csv @@ -18,21 +18,21 @@ fetchai/agents/thermometer_aea,QmXwmPDtZ3Q7t5u3k1ounzDg5rtFD4vsTBTH43UGrmbdvq fetchai/agents/thermometer_client,QmRMKu9hAzSZQyuSPGg9umQGDRrq1miwrVKo7SFMKDqQV4 fetchai/agents/weather_client,Qmah4VhqdoH6k95xUZk9VREjG4iX5drKvUj2cypiAugoXK fetchai/agents/weather_station,QmfD44aXS4TmcZFMASb8vDxYK6eNFsQMkSTBmTdcqzGPhc -fetchai/connections/gym,QmZNEJvgi9n5poswQrHav3fvSv5vA1nbxxkTzWENCoCdrc -fetchai/connections/http_client,QmXQrA6gA4hMEMkMQsEp1MQwDEqRw5BnnqR4gCrP5xqVD2 -fetchai/connections/http_server,QmPMSyX1iaWM7mWqFtW8LnSyR9r88RzYbGtyYmopT6tshC +fetchai/connections/gym,QmXpTer28dVvxeXqsXzaBqX551QToh9w5KJC2oXcStpKJG +fetchai/connections/http_client,Qmaskbh2Qw1S4dH7HwTzBYf9jAYPTUm9XLQQtg2CBLYCnC +fetchai/connections/http_server,QmfFjnbBbUCg5yEF4J4A7TCH8ePoEsQ6dYDCvdxoXt8Fu2 fetchai/connections/ledger,QmWDietq5YFkZAYpyyrmq7AFbTVN4skDWKsG4CXrc5uZ37 -fetchai/connections/local,QmVcTEJxGbWbtXi2fLN5eJA6XuEAneaNd83UJPugrtb9xU -fetchai/connections/oef,QmfHVVcwUb8SqGYHs51iH5ymK5xJvxCCtShEmc9cw4FNVZ -fetchai/connections/p2p_client,QmbwCDuAB1eq6JikqeAAqpqjVhxevGNeWCLqRD67Uvqiaz -fetchai/connections/p2p_libp2p,QmdFoDC26e94ACZB2nVLTyoSMDwKGuyupB3WJhfZ2Mi3Bk -fetchai/connections/p2p_libp2p_client,QmVhsh863k3ws4HeDpkZm7GQkrW3aMREu5sLkHATmwCddC +fetchai/connections/local,QmbJHrPmvoaFU9bAMJA8t4VPUf7fmYce7PDyfgUqXyXJqa +fetchai/connections/oef,QmXnb9yBD3eAkyZGVSwxmze4VQJkTrTVm6E6Ngz7BmQeNE +fetchai/connections/p2p_client,QmcGpe9RFtk18JARZjue2zA1CZCCocj8jEP8toUdddFyzo +fetchai/connections/p2p_libp2p,Qmak5kzJwZBgPcmdLJqL1pyUJogmdyntY3mR3it8xmmtnv +fetchai/connections/p2p_libp2p_client,QmfAuTGyrxJm7igAAEiH1vxvfVo7UAir6KmSLANov25gne fetchai/connections/p2p_stub,QmSBRr26YELdbYk9nAurw3XdQ3Myj7cVgCDZZMv7DMrsdg fetchai/connections/scaffold,QmTzEeEydjohZNTsAJnoGMtzTgCyzMBQCYgbTBLfqWtw5w -fetchai/connections/soef,QmYQ6YCwtJdqzb1anJbVr5sZ96UUdnjMRpjqa2DgVHzfPi +fetchai/connections/soef,QmQrY8jDgydNCeJRNV9QdCT1WF4nKK2iHUiVby7QiwXH7t fetchai/connections/stub,QmWP6tgcttnUY86ynAseyHuuFT85edT31QPSyideVveiyj -fetchai/connections/tcp,QmVhT3tfZXDGkXUhhpEFwKqtPPQjCdDY3YtRHw9AWyHzhx -fetchai/connections/webhook,QmZ3vofEwRBZPvMCxLVanSnsewXTdK5nHyWiDWjzFUbTRy +fetchai/connections/tcp,QmYCACebYtvE4fD6hT2TpJ7aqC9pBh4HajrjEZPcjtcHgV +fetchai/connections/webhook,QmbHMrLLfNQKAseu1YTHoU91W2GfmgBcnyiFYee2CSuKKk fetchai/contracts/erc1155,QmPEae32YqmCmB7nAzoLokosvnu3u8ZN75xouzZEBvE5zM fetchai/contracts/scaffold,Qme97drP4cwCyPs3zV6WaLz9K7c5ZWRtSWQ25hMUmMjFgo fetchai/protocols/contract_api,QmcveAM85xPuhv2Dmo63adnhh5zgFVjPpPYQFEtKWxXvKj diff --git a/tests/test_packages/test_connections/test_oef/test_communication.py b/tests/test_packages/test_connections/test_oef/test_communication.py index 136a4967fc..715b4b3699 100644 --- a/tests/test_packages/test_connections/test_oef/test_communication.py +++ b/tests/test_packages/test_connections/test_oef/test_communication.py @@ -53,7 +53,6 @@ from aea.protocols.default.message import DefaultMessage from aea.test_tools.test_cases import UseOef -import packages from packages.fetchai.connections.oef.connection import OEFObjectTranslator from packages.fetchai.protocols.fipa import fipa_pb2 from packages.fetchai.protocols.fipa.message import FipaMessage @@ -1063,7 +1062,7 @@ async def test_send_oef_message(self, pytestconfig): await oef_connection.disconnect() @pytest.mark.asyncio - async def test_cancelled_receive(self, pytestconfig): + async def test_cancelled_receive(self, pytestconfig, caplog): """Test the case when a receive request is cancelled.""" oef_connection = _make_oef_connection( address=FETCHAI_ADDRESS_ONE, oef_addr="127.0.0.1", oef_port=10000, @@ -1071,21 +1070,18 @@ async def test_cancelled_receive(self, pytestconfig): oef_connection.loop = asyncio.get_event_loop() await oef_connection.connect() - patch = unittest.mock.patch.object( - packages.fetchai.connections.oef.connection.logger, "debug" - ) - mocked_logger_debug = patch.start() + with caplog.at_level(logging.DEBUG, "aea.packages.fetchai.connections.oef"): - async def receive(): - await oef_connection.receive() + async def receive(): + await oef_connection.receive() - task = asyncio.ensure_future(receive(), loop=asyncio.get_event_loop()) - await asyncio.sleep(0.1) - task.cancel() - await asyncio.sleep(0.1) - await oef_connection.disconnect() + task = asyncio.ensure_future(receive(), loop=asyncio.get_event_loop()) + await asyncio.sleep(0.1) + task.cancel() + await asyncio.sleep(0.1) + await oef_connection.disconnect() - mocked_logger_debug.assert_called_once_with("Receive cancelled.") + assert "Receive cancelled." in caplog.text @pytest.mark.asyncio async def test_exception_during_receive(self, pytestconfig): @@ -1125,7 +1121,7 @@ async def test_connecting_twice_is_ok(self, pytestconfig): @pytest.mark.skipif( sys.version_info < (3, 7), reason="Python version < 3.7 not supported by the OEF." ) -async def test_cannot_connect_to_oef(): +async def test_cannot_connect_to_oef(caplog): """Test the case when we can't connect to the OEF.""" oef_connection = _make_oef_connection( address=FETCHAI_ADDRESS_ONE, @@ -1133,20 +1129,16 @@ async def test_cannot_connect_to_oef(): oef_port=61234, # use addr instead of hostname to avoid name resolution ) - patch = unittest.mock.patch.object( - packages.fetchai.connections.oef.connection.logger, "warning" - ) - mocked_logger_warning = patch.start() + with caplog.at_level(logging.DEBUG, logger="aea.packages.fetchai.connections.oef"): - task = asyncio.ensure_future( - oef_connection.connect(), loop=asyncio.get_event_loop() - ) - await asyncio.sleep(3.0) - mocked_logger_warning.assert_called_with( - "Cannot connect to OEFChannel. Retrying in 5 seconds..." - ) - await cancel_and_wait(task) - await oef_connection.disconnect() + task = asyncio.ensure_future( + oef_connection.connect(), loop=asyncio.get_event_loop() + ) + await asyncio.sleep(3.0) + assert "Cannot connect to OEFChannel. Retrying in 5 seconds..." in caplog.text + + await cancel_and_wait(task) + await oef_connection.disconnect() @pytest.mark.asyncio diff --git a/tests/test_packages/test_connections/test_tcp/test_base.py b/tests/test_packages/test_connections/test_tcp/test_base.py index 6bb0d5d53a..4f7b1bcb03 100644 --- a/tests/test_packages/test_connections/test_tcp/test_base.py +++ b/tests/test_packages/test_connections/test_tcp/test_base.py @@ -20,6 +20,7 @@ """This module contains the tests for the TCP base module.""" import asyncio +import logging import unittest.mock from asyncio import CancelledError @@ -28,8 +29,6 @@ from aea.mail.base import Envelope from aea.protocols.default.message import DefaultMessage -import packages - from tests.conftest import ( _make_tcp_client_connection, _make_tcp_server_connection, @@ -38,7 +37,7 @@ @pytest.mark.asyncio -async def test_connect_twice(): +async def test_connect_twice(caplog): """Test that connecting twice the tcp connection works correctly.""" port = get_unused_tcp_port() tcp_connection = _make_tcp_server_connection("address", "127.0.0.1", port) @@ -48,17 +47,15 @@ async def test_connect_twice(): await tcp_connection.connect() await asyncio.sleep(0.1) - with unittest.mock.patch.object( - packages.fetchai.connections.tcp.base.logger, "warning" - ) as mock_logger_warning: + with caplog.at_level(logging.WARNING, "aea.packages.fetchai.connections.tcp"): await tcp_connection.connect() - mock_logger_warning.assert_called_with("Connection already set up.") + assert "Connection already set up." in caplog.text await tcp_connection.disconnect() @pytest.mark.asyncio -async def test_connect_raises_exception(): +async def test_connect_raises_exception(caplog): """Test the case that a connection attempt raises an exception.""" port = get_unused_tcp_port() tcp_connection = _make_tcp_server_connection("address", "127.0.0.1", port) @@ -66,31 +63,27 @@ async def test_connect_raises_exception(): loop = asyncio.get_event_loop() tcp_connection.loop = loop - with unittest.mock.patch.object( - packages.fetchai.connections.tcp.base.logger, "error" - ) as mock_logger_error: + with caplog.at_level(logging.ERROR, "packages.fetchai.connections.tcp.base.logger"): with unittest.mock.patch.object( tcp_connection, "setup", side_effect=Exception("error during setup") ): await tcp_connection.connect() - mock_logger_error.assert_called_with("error during setup") + assert "error during setup" in caplog.text @pytest.mark.asyncio -async def test_disconnect_when_already_disconnected(): +async def test_disconnect_when_already_disconnected(caplog): """Test that disconnecting a connection already disconnected works correctly.""" port = get_unused_tcp_port() tcp_connection = _make_tcp_server_connection("address", "127.0.0.1", port) - with unittest.mock.patch.object( - packages.fetchai.connections.tcp.base.logger, "warning" - ) as mock_logger_warning: + with caplog.at_level(logging.WARNING, "aea.packages.fetchai.connections.tcp"): await tcp_connection.disconnect() - mock_logger_warning.assert_called_with("Connection already disconnected.") + assert "Connection already disconnected." in caplog.text @pytest.mark.asyncio -async def test_send_to_unknown_destination(): +async def test_send_to_unknown_destination(caplog): """Test that a message to an unknown destination logs an error.""" address = "address" port = get_unused_tcp_port() @@ -101,13 +94,9 @@ async def test_send_to_unknown_destination(): protocol_id=DefaultMessage.protocol_id, message=b"", ) - with unittest.mock.patch.object( - packages.fetchai.connections.tcp.base.logger, "error" - ) as mock_logger_error: + with caplog.at_level(logging.ERROR, "aea.packages.fetchai.connections.tcp"): await tcp_connection.send(envelope) - mock_logger_error.assert_called_with( - "[{}]: Cannot send envelope {}".format(address, envelope) - ) + assert "[{}]: Cannot send envelope {}".format(address, envelope) in caplog.text @pytest.mark.asyncio diff --git a/tests/test_packages/test_connections/test_tcp/test_communication.py b/tests/test_packages/test_connections/test_tcp/test_communication.py index f85883e5b8..1a4502a039 100644 --- a/tests/test_packages/test_connections/test_tcp/test_communication.py +++ b/tests/test_packages/test_connections/test_tcp/test_communication.py @@ -20,6 +20,7 @@ """This module contains the tests for the TCP connection communication.""" import asyncio +import logging import struct import unittest.mock @@ -29,8 +30,6 @@ from aea.multiplexer import Multiplexer from aea.protocols.default.message import DefaultMessage -import packages - from tests.conftest import ( _make_tcp_client_connection, _make_tcp_server_connection, @@ -157,7 +156,7 @@ class TestTCPClientConnection: """Test TCP Client code.""" @pytest.mark.asyncio - async def test_receive_cancelled(self): + async def test_receive_cancelled(self, caplog): """Test that cancelling a receive task works correctly.""" port = get_unused_tcp_port() tcp_server = _make_tcp_server_connection("address_server", "127.0.0.1", port,) @@ -166,23 +165,21 @@ async def test_receive_cancelled(self): await tcp_server.connect() await tcp_client.connect() - with unittest.mock.patch.object( - packages.fetchai.connections.tcp.tcp_client.logger, "debug" - ) as mock_logger_debug: + with caplog.at_level( + logging.DEBUG, "aea.packages.fetchai.connections.tcp.tcp_client" + ): task = asyncio.ensure_future(tcp_client.receive()) await asyncio.sleep(0.1) task.cancel() await asyncio.sleep(0.1) - mock_logger_debug.assert_called_with( - "[{}] Read cancelled.".format("address_client") - ) + assert "[{}] Read cancelled.".format("address_client") in caplog.text assert task.result() is None await tcp_client.disconnect() await tcp_server.disconnect() @pytest.mark.asyncio - async def test_receive_raises_struct_error(self): + async def test_receive_raises_struct_error(self, caplog): """Test the case when a receive raises a struct error.""" port = get_unused_tcp_port() tcp_server = _make_tcp_server_connection("address_server", "127.0.0.1", port,) @@ -191,15 +188,15 @@ async def test_receive_raises_struct_error(self): await tcp_server.connect() await tcp_client.connect() - with unittest.mock.patch.object( - packages.fetchai.connections.tcp.tcp_client.logger, "debug" - ) as mock_logger_debug: + with caplog.at_level( + logging.DEBUG, "aea.packages.fetchai.connections.tcp.tcp_client" + ): with unittest.mock.patch.object( tcp_client, "_recv", side_effect=struct.error ): task = asyncio.ensure_future(tcp_client.receive()) await asyncio.sleep(0.1) - mock_logger_debug.assert_called_with("Struct error: ") + assert "Struct error: " in caplog.text assert task.result() is None await tcp_client.disconnect() @@ -231,7 +228,7 @@ class TestTCPServerConnection: """Test TCP Server code.""" @pytest.mark.asyncio - async def test_receive_raises_exception(self): + async def test_receive_raises_exception(self, caplog): """Test the case when a receive raises a generic exception.""" port = get_unused_tcp_port() tcp_server = _make_tcp_server_connection("address_server", "127.0.0.1", port,) @@ -240,17 +237,15 @@ async def test_receive_raises_exception(self): await tcp_server.connect() await tcp_client.connect() await asyncio.sleep(0.1) - with unittest.mock.patch.object( - packages.fetchai.connections.tcp.tcp_server.logger, "error" - ) as mock_logger_error: + with caplog.at_level( + logging.DEBUG, "aea.packages.fetchai.connections.tcp.tcp_server" + ): with unittest.mock.patch( "asyncio.wait", side_effect=Exception("generic exception") ): result = await tcp_server.receive() assert result is None - mock_logger_error.assert_called_with( - "Error in the receiving loop: generic exception" - ) + assert "Error in the receiving loop: generic exception" in caplog.text await tcp_client.disconnect() await tcp_server.disconnect() From 4246e2954812494b67045f9f72773d045ad3ebe0 Mon Sep 17 00:00:00 2001 From: MarcoFavorito Date: Tue, 14 Jul 2020 10:35:42 +0200 Subject: [PATCH 07/11] normalize the string format to be f-string --- aea/helpers/logging.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aea/helpers/logging.py b/aea/helpers/logging.py index 1a23901e91..6d014c1e07 100644 --- a/aea/helpers/logging.py +++ b/aea/helpers/logging.py @@ -37,7 +37,7 @@ def process( self, msg: Any, kwargs: MutableMapping[str, Any] ) -> Tuple[Any, MutableMapping[str, Any]]: """Prepend the agent name to every log message.""" - return "[%s] %s" % (self.extra["agent_name"], msg), kwargs + return f"[{self.extra['agent_name']}] {msg}", kwargs class WithLogger: From 03bb8bfa4c12b4a88c4e2b633e71c90152ef1f7f Mon Sep 17 00:00:00 2001 From: MarcoFavorito Date: Tue, 14 Jul 2020 11:13:54 +0200 Subject: [PATCH 08/11] fix connection package test due to bad log capturing --- aea/components/base.py | 4 +--- aea/helpers/logging.py | 12 +++++++++--- tests/conftest.py | 7 +++++++ 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/aea/components/base.py b/aea/components/base.py index 16a45bd7d8..ec7c8b7565 100644 --- a/aea/components/base.py +++ b/aea/components/base.py @@ -21,9 +21,8 @@ import logging import types from abc import ABC -from logging import Logger, LoggerAdapter from pathlib import Path -from typing import Dict, Optional, Union +from typing import Dict, Optional from aea.configurations.base import ( ComponentConfiguration, @@ -54,7 +53,6 @@ def __init__( self._configuration = configuration self._directory = None # type: Optional[Path] self._is_vendor = is_vendor - self._logger: Optional[Union[Logger, LoggerAdapter]] = None # mapping from import path to module object # the keys are dotted paths of Python modules. diff --git a/aea/helpers/logging.py b/aea/helpers/logging.py index 6d014c1e07..6cc46866ba 100644 --- a/aea/helpers/logging.py +++ b/aea/helpers/logging.py @@ -43,21 +43,27 @@ def process( class WithLogger: """Interface to endow subclasses with a logger.""" - def __init__(self, logger: Optional[Union[Logger, LoggerAdapter]] = None): + def __init__( + self, + logger: Optional[Union[Logger, LoggerAdapter]] = None, + default_logger_name: str = "aea", + ): """ Initialize the logger. :param logger: the logger object. + :param default_logger_name: the default logger name, if a logger is not provided. """ self._logger = logger + self._default_logger_name = default_logger_name @property def logger(self) -> Union[Logger, LoggerAdapter]: """Get the component logger.""" if self._logger is None: # if not set (e.g. programmatic instantiation) - # return a default one with "aea" as logger namespace. - return logging.getLogger("aea") + # return a default one with the default logger name. + return logging.getLogger(self._default_logger_name) return self._logger @logger.setter diff --git a/tests/conftest.py b/tests/conftest.py index 111ea5f360..a48f14038e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -669,6 +669,7 @@ def _make_oef_connection(address: Address, oef_addr: str, oef_port: int): oef_connection = OEFConnection( configuration=configuration, identity=Identity("name", address), ) + oef_connection._default_logger_name = "aea.packages.fetchai.connections.oef" return oef_connection @@ -679,6 +680,9 @@ def _make_tcp_server_connection(address: str, host: str, port: int): tcp_connection = TCPServerConnection( configuration=configuration, identity=Identity("name", address), ) + tcp_connection._default_logger_name = ( + "aea.packages.fetchai.connections.tcp.tcp_server" + ) return tcp_connection @@ -689,6 +693,9 @@ def _make_tcp_client_connection(address: str, host: str, port: int): tcp_connection = TCPClientConnection( configuration=configuration, identity=Identity("name", address), ) + tcp_connection._default_logger_name = ( + "aea.packages.fetchai.connections.tcp.tcp_client" + ) return tcp_connection From e875bb72d9897e45e3f07051e25c4d524fa2cdc8 Mon Sep 17 00:00:00 2001 From: MarcoFavorito Date: Tue, 14 Jul 2020 12:38:04 +0200 Subject: [PATCH 09/11] remove default logger in Component.__init__ --- aea/components/base.py | 2 +- tests/test_packages/test_connections/test_tcp/test_base.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/aea/components/base.py b/aea/components/base.py index ec7c8b7565..166defa01d 100644 --- a/aea/components/base.py +++ b/aea/components/base.py @@ -49,7 +49,7 @@ def __init__( :param configuration: the package configuration. :param is_vendor: whether the package is vendorized. """ - WithLogger.__init__(self, logging.getLogger("aea")) + WithLogger.__init__(self) self._configuration = configuration self._directory = None # type: Optional[Path] self._is_vendor = is_vendor diff --git a/tests/test_packages/test_connections/test_tcp/test_base.py b/tests/test_packages/test_connections/test_tcp/test_base.py index 4f7b1bcb03..4df98aaac5 100644 --- a/tests/test_packages/test_connections/test_tcp/test_base.py +++ b/tests/test_packages/test_connections/test_tcp/test_base.py @@ -63,7 +63,7 @@ async def test_connect_raises_exception(caplog): loop = asyncio.get_event_loop() tcp_connection.loop = loop - with caplog.at_level(logging.ERROR, "packages.fetchai.connections.tcp.base.logger"): + with caplog.at_level(logging.ERROR, "aea.packages.fetchai.connections.tcp"): with unittest.mock.patch.object( tcp_connection, "setup", side_effect=Exception("error during setup") ): From ebcc69ef317d4a59e1e5d8165f0342dde821416d Mon Sep 17 00:00:00 2001 From: MarcoFavorito Date: Tue, 14 Jul 2020 16:14:50 +0200 Subject: [PATCH 10/11] address PR comments --- aea/aea_builder.py | 20 +++++++++---------- .../connections/http_client/connection.py | 1 - .../connections/http_client/connection.yaml | 2 +- packages/hashes.csv | 2 +- 4 files changed, 12 insertions(+), 13 deletions(-) diff --git a/aea/aea_builder.py b/aea/aea_builder.py index 5461ec1dd5..0b043d98b6 100644 --- a/aea/aea_builder.py +++ b/aea/aea_builder.py @@ -880,12 +880,12 @@ def build(self, connection_ids: Optional[Collection[PublicId]] = None,) -> AEA: copy(self.private_key_paths), copy(self.connection_private_key_paths) ) identity = self._build_identity_from_wallet(wallet) - self._load_and_add_components(ComponentType.PROTOCOL, resources, identity) - self._load_and_add_components(ComponentType.CONTRACT, resources, identity) + self._load_and_add_components(ComponentType.PROTOCOL, resources, identity.name) + self._load_and_add_components(ComponentType.CONTRACT, resources, identity.name) self._load_and_add_components( ComponentType.CONNECTION, resources, - identity, + identity.name, identity=identity, crypto_store=wallet.connection_cryptos, ) @@ -910,7 +910,7 @@ def build(self, connection_ids: Optional[Collection[PublicId]] = None,) -> AEA: **deepcopy(self._context_namespace), ) self._load_and_add_components( - ComponentType.SKILL, resources, identity, agent_context=aea.context + ComponentType.SKILL, resources, identity.name, agent_context=aea.context ) self._build_called = True self._populate_contract_registry() @@ -1351,7 +1351,7 @@ def _load_and_add_components( self, component_type: ComponentType, resources: Resources, - aea_identity: Identity, + agent_name: str, **kwargs, ) -> None: """ @@ -1359,7 +1359,7 @@ def _load_and_add_components( :param component_type: the component type for which :param resources: the resources object to populate. - :param aea_identity: the identity of the AEA. + :param agent_name: the AEA name for logging purposes. :param kwargs: keyword argument to forward to the component loader. :return: None """ @@ -1376,7 +1376,7 @@ def _load_and_add_components( configuration = deepcopy(configuration) component = load_component_from_config(configuration, **kwargs) - _set_logger_to_component(component, configuration, aea_identity) + _set_logger_to_component(component, configuration, agent_name) resources.add_component(component) def _populate_contract_registry(self): @@ -1424,21 +1424,21 @@ def _check_we_can_build(self): def _set_logger_to_component( - component: Component, configuration: ComponentConfiguration, identity: Identity, + component: Component, configuration: ComponentConfiguration, agent_name: str, ) -> None: """ Set the logger to the component. :param component: the component instance. :param configuration: the component configuration - :param identity: the identity object of the AEA. + :param agent_name: the agent name :return: None """ if configuration.component_type == ComponentType.SKILL: # skip because skill object already have their own logger from the skill context. return logger_name = f"aea.packages.{configuration.author}.{configuration.component_type.to_plural()}.{configuration.name}" - logger = AgentLoggerAdapter(logging.getLogger(logger_name), identity.name) + logger = AgentLoggerAdapter(logging.getLogger(logger_name), agent_name) component.logger = logger diff --git a/packages/fetchai/connections/http_client/connection.py b/packages/fetchai/connections/http_client/connection.py index 0560858329..ce7f7f20ee 100644 --- a/packages/fetchai/connections/http_client/connection.py +++ b/packages/fetchai/connections/http_client/connection.py @@ -88,7 +88,6 @@ def __init__( self._tasks: Set[Task] = set() self.logger = logger - # TODO logger at this point is the module-level one, not with the agent name. self.logger.info("Initialised the HTTP client channel") async def connect(self, loop: AbstractEventLoop) -> None: diff --git a/packages/fetchai/connections/http_client/connection.yaml b/packages/fetchai/connections/http_client/connection.yaml index 316b33a890..5c05c1312c 100644 --- a/packages/fetchai/connections/http_client/connection.yaml +++ b/packages/fetchai/connections/http_client/connection.yaml @@ -7,7 +7,7 @@ license: Apache-2.0 aea_version: '>=0.5.0, <0.6.0' fingerprint: __init__.py: QmPdKAks8A6XKAgZiopJzPZYXJumTeUqChd8UorqmLQQPU - connection.py: QmZVEzoHKi5qRabE3fWF1zJYphcU4frxJQo9dWXnbDV5RL + connection.py: QmVYurcnjuRTK6CnuEc6qNbSykmZEzRMkjyGhknJKzKRQt fingerprint_ignore_patterns: [] protocols: - fetchai/http:0.3.0 diff --git a/packages/hashes.csv b/packages/hashes.csv index 015790745e..5be3c0a96c 100644 --- a/packages/hashes.csv +++ b/packages/hashes.csv @@ -19,7 +19,7 @@ fetchai/agents/thermometer_client,QmbcVyNwpHAwY8NPcgd2bdDpGhLLZLTyg6o78o67RXrNC1 fetchai/agents/weather_client,QmemjFHEFE32mXjP48NEX7prqaAHfW9wTM8mBAPfM4dUeA fetchai/agents/weather_station,QmQ8vVjVB4xDqjZwd5SH2skaqXFMkkBSX69j7VXM3ru2ez fetchai/connections/gym,QmXpTer28dVvxeXqsXzaBqX551QToh9w5KJC2oXcStpKJG -fetchai/connections/http_client,QmcvCouzByryadF1H9YbgDotRmV26F9QhrxFSdGmRgcdNV +fetchai/connections/http_client,QmUjtATHombNqbwHRonc3pLUTfuvQJBxqGAj4K5zKT8beQ fetchai/connections/http_server,QmXuGssPAahvRXHNmYrvtqYokgeCqavoiK7x9zmjQT8w23 fetchai/connections/ledger,QmVXceMJCioA1Hro9aJgBwrF9yLgToaVXifDz6EVo6vTXn fetchai/connections/local,QmZKciQTgE8LLHsgQX4F5Ecc7rNPp9BBSWQHEEe7jEMEmJ From 20f90168419c56ee110bdb76fb0fcb1091b3a295 Mon Sep 17 00:00:00 2001 From: MarcoFavorito Date: Tue, 14 Jul 2020 16:44:28 +0200 Subject: [PATCH 11/11] ignore pylint issue --- aea/registries/base.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/aea/registries/base.py b/aea/registries/base.py index 9b84dc4a29..3e162fdb6b 100644 --- a/aea/registries/base.py +++ b/aea/registries/base.py @@ -420,7 +420,9 @@ def unregister(self, item_id: Tuple[SkillId, str]) -> None: raise ValueError( "No item registered with component id '{}'".format(item_id) ) - self.logger.debug("Unregistering item with id {}".format(item_id)) + self.logger.debug( # pylint: disable=no-member + "Unregistering item with id {}".format(item_id) + ) handler = name_to_item.pop(item_name) if len(name_to_item) == 0: