Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase libp2ps connections code coverage to 100% #1497

Merged
merged 7 commits into from
Jul 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 21 additions & 24 deletions packages/fetchai/connections/p2p_libp2p/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@


async def _golang_module_build_async(
path: str, log_file_desc: IO[str], loop: Optional[asyncio.AbstractEventLoop] = None
path: str,
log_file_desc: IO[str],
loop: Optional[asyncio.AbstractEventLoop] = None,
timeout: float = LIBP2P_NODE_DEPS_DOWNLOAD_TIMEOUT,
) -> int:
"""
Builds go module located at `path`, downloads necessary dependencies
Expand All @@ -80,9 +83,7 @@ async def _golang_module_build_async(
golang_build = asyncio.ensure_future(proc.start())

try:
returncode = await asyncio.wait_for(
golang_build, LIBP2P_NODE_DEPS_DOWNLOAD_TIMEOUT
)
returncode = await asyncio.wait_for(golang_build, timeout)
lrahmani marked this conversation as resolved.
Show resolved Hide resolved
except asyncio.TimeoutError:
e = Exception(
"Failed to download libp2p dependencies within timeout({})".format(
Expand Down Expand Up @@ -186,7 +187,7 @@ def __init__(
def __str__(self):
return "{}:{}".format(self._host, self._port)

def __repr__(self):
def __repr__(self): # pragma: no cover
return self.__str__()

@property
Expand Down Expand Up @@ -293,10 +294,6 @@ async def start(self) -> None:

:return: None
"""
which = shutil.which("go")
if which is None:
raise Exception("Libp2p Go should me installed")
Comment on lines -296 to -298
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why has this been removed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because connection already does the check (using _check_go_installed())


if self._loop is None:
self._loop = asyncio.get_event_loop()

Expand Down Expand Up @@ -325,9 +322,9 @@ async def start(self) -> None:
out_path = self.aea_to_libp2p_path
logger.debug("Creating pipes ({}, {})...".format(in_path, out_path))
if os.path.exists(in_path):
os.remove(in_path)
os.remove(in_path) # pragma: no cover
if os.path.exists(out_path):
os.remove(out_path)
os.remove(out_path) # pragma: no cover
os.mkfifo(in_path)
os.mkfifo(out_path)

Expand Down Expand Up @@ -407,7 +404,7 @@ async def _connect(self) -> None:
await self._connect()
return
else:
raise e
raise e # pragma: no cover

# setup reader
assert (
Expand Down Expand Up @@ -450,11 +447,11 @@ async def read(self) -> Optional[bytes]:
try:
logger.debug("Waiting for messages...")
buf = await self._stream_reader.readexactly(4)
if not buf:
if not buf: # pragma: no cover
return None
size = struct.unpack("!I", buf)[0]
data = await self._stream_reader.readexactly(size)
if not data:
if not data: # pragma: no cover
return None
return data
except asyncio.streams.IncompleteReadError as e:
Expand Down Expand Up @@ -510,7 +507,7 @@ def stop(self) -> None:
)
self.proc.wait()
else:
logger.debug("Called stop when process not set!")
logger.debug("Called stop when process not set!") # pragma: no cover
if os.path.exists(LIBP2P_NODE_ENV_FILE):
os.remove(LIBP2P_NODE_ENV_FILE)

Expand Down Expand Up @@ -544,7 +541,7 @@ def __init__(self, **kwargs):
if (
self.has_crypto_store
and self.crypto_store.crypto_objects.get("fetchai", None) is not None
):
): # pragma: no cover
key = cast(FetchAICrypto, self.crypto_store.crypto_objects["fetchai"])
elif libp2p_key_file is not None:
key = FetchAICrypto(libp2p_key_file)
Expand Down Expand Up @@ -576,7 +573,7 @@ def __init__(self, **kwargs):
raise ValueError(
"At least one Entry Peer should be provided when node can not be publically reachable"
)
if delegate_uri is not None:
if delegate_uri is not None: # pragma: no cover
logger.warning(
"Ignoring Delegate Uri configuration as node can not be publically reachable"
)
Expand Down Expand Up @@ -610,12 +607,12 @@ def __init__(self, **kwargs):
self._receive_from_node_task = None # type: Optional[asyncio.Future]

@property
def libp2p_address(self) -> str:
def libp2p_address(self) -> str: # pragma: no cover
"""The address used by the node."""
return self.node.pub

@property
def libp2p_address_id(self) -> str:
def libp2p_address_id(self) -> str: # pragma: no cover
"""The identifier for the address."""
return LIBP2P

Expand All @@ -625,7 +622,7 @@ async def connect(self) -> None:

:return: None
"""
if self.connection_status.is_connected:
if self.connection_status.is_connected: # pragma: no cover
return
try:
# start libp2p node
Expand Down Expand Up @@ -682,7 +679,7 @@ async def receive(self, *args, **kwargs) -> Optional["Envelope"]:
# TOFIX(LR) attempt restarting the node?
logger.debug("Received data: {}".format(data))
return Envelope.decode(data)
except CancelledError:
except CancelledError: # pragma: no cover
logger.debug("Receive cancelled.")
return None
except Exception as e: # pragma: nocover # pylint: disable=broad-except
Expand All @@ -705,17 +702,17 @@ async def _receive_from_node(self) -> None:
"""
while True:
data = await self.node.read()
if data is None:
break
assert self._in_queue is not None, "Input queue not initialized."
self._in_queue.put_nowait(data)
if data is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why this was moved here; data is already used in line 706

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now you might be putting None on the queue

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, exactly.
The _receive_from_node task will exit and the multiplexer will receive None on its scheduled receive

break

@staticmethod
def _check_go_installed() -> None:
"""Checks if go is installed. Sys.exits if not"""
res = shutil.which("go")
if res is None:
raise AEAException(
raise AEAException( # pragma: nocover
"Please install go before running the `fetchai/p2p_libp2p:0.1.0` connection. "
"Go is available for download here: https://golang.org/doc/install"
)
2 changes: 1 addition & 1 deletion packages/fetchai/connections/p2p_libp2p/connection.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ fingerprint:
aea/api.go: QmW5fUpVZmV3pxgoakm3RvsvCGC6FwT2XprcqXHM8rBXP5
aea/envelope.pb.go: QmRfUNGpCeVJfsW3H1MzCN4pwDWgumfyWufVFp6xvUjjug
aea/envelope.proto: QmSC8EGCKiNFR2vf5bSWymSzYDFMipQW9aQVMwPzQoKb4n
connection.py: QmPJhRWhULLNMb1uxexfERUQvntN3FEBsgN5ELv38oDwA4
connection.py: QmaGCXNSyuW5MP428XB1FxUt6sAYdsNrh13UWLDFeBoP7b
dht/dhtclient/dhtclient.go: QmNnU1pVCUtj8zJ1Pz5eMk9sznsjPFSJ9qDkzbrNwzEecV
dht/dhtclient/dhtclient_test.go: QmPfnHSHXtbaW5VYuq1QsKQWey64pUEvLEaKKkT9eAcmws
dht/dhtclient/options.go: QmPorj38wNrxGrzsbFe5wwLmiHzxbTJ2VsgvSd8tLDYS8s
Expand Down
26 changes: 13 additions & 13 deletions packages/fetchai/connections/p2p_libp2p_client/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def __init__(
def __str__(self):
return "{}:{}".format(self._host, self._port)

def __repr__(self):
def __repr__(self): # pragma: no cover
return self.__str__()

@property
Expand Down Expand Up @@ -106,12 +106,12 @@ def __init__(self, **kwargs):
if (
self.has_crypto_store
and self.crypto_store.crypto_objects.get("fetchai", None) is not None
):
): # pragma: no cover
key = cast(FetchAICrypto, self.crypto_store.crypto_objects["fetchai"])
elif key_file is None:
key = FetchAICrypto()
else:
elif key_file is not None:
key = FetchAICrypto(key_file)
else:
key = FetchAICrypto()

# client connection id
self.key = key
Expand Down Expand Up @@ -144,7 +144,7 @@ async def connect(self) -> None:

:return: None
"""
if self.connection_status.is_connected:
if self.connection_status.is_connected: # pragma: no cover
return
if self._loop is None:
self._loop = asyncio.get_event_loop()
Expand Down Expand Up @@ -211,7 +211,7 @@ async def disconnect(self) -> None:

if self._in_queue is not None:
self._in_queue.put_nowait(None)
else:
else: # pragma: no cover
logger.debug("Called disconnect when input queue not initialized.")

async def receive(self, *args, **kwargs) -> Optional["Envelope"]:
Expand All @@ -234,10 +234,10 @@ async def receive(self, *args, **kwargs) -> Optional["Envelope"]:
# TOFIX(LR) attempt restarting the node?
logger.debug("Received data: {}".format(data))
return Envelope.decode(data)
except CancelledError:
except CancelledError: # pragma: no cover
logger.debug("Receive cancelled.")
return None
except Exception as e: # pragma: nocover # pylint: disable=broad-except
except Exception as e: # pragma: no cover # pylint: disable=broad-except
logger.exception(e)
return None

Expand All @@ -257,10 +257,10 @@ async def _process_messages(self) -> None:
"""
while True:
data = await self._receive()
if data is None:
break
assert self._in_queue is not None, "Input queue not initialized."
self._in_queue.put_nowait(data)
if data is None:
break

async def _send(self, data: bytes) -> None:
assert self._writer is not None
Expand All @@ -274,11 +274,11 @@ async def _receive(self) -> Optional[bytes]:
try:
logger.debug("Waiting for messages...")
buf = await self._reader.readexactly(4)
if not buf:
if not buf: # pragma: no cover
return None
size = struct.unpack("!I", buf)[0]
data = await self._reader.readexactly(size)
if not data:
if not data: # pragma: no cover
return None
return data
except asyncio.streams.IncompleteReadError as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ license: Apache-2.0
aea_version: '>=0.5.0, <0.6.0'
fingerprint:
__init__.py: QmT1FEHkPGMHV5oiVEfQHHr25N2qdZxydSNRJabJvYiTgf
connection.py: QmT9ncNDy27GXAqtmJJDFQep2M8Qn7ycih7E8tMT2PwS3i
connection.py: QmZfEw3G2LXEivmGu9UodJwhptcRCz3BYkRGfepuRfhGWU
fingerprint_ignore_patterns: []
protocols: []
class_name: P2PLibp2pClientConnection
Expand Down
4 changes: 2 additions & 2 deletions packages/hashes.csv
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ fetchai/connections/ledger,QmVXceMJCioA1Hro9aJgBwrF9yLgToaVXifDz6EVo6vTXn
fetchai/connections/local,QmVcTEJxGbWbtXi2fLN5eJA6XuEAneaNd83UJPugrtb9xU
fetchai/connections/oef,QmdkQ9hUbJ8HsJD5qxSPRae9s2G9LZXFhfJabeHBVVYMJi
fetchai/connections/p2p_client,QmbwCDuAB1eq6JikqeAAqpqjVhxevGNeWCLqRD67Uvqiaz
fetchai/connections/p2p_libp2p,QmPriLUrxDUQSfuE42rdEvGv5EeAT9dLGBtBD6vB5Kf147
fetchai/connections/p2p_libp2p_client,QmVhsh863k3ws4HeDpkZm7GQkrW3aMREu5sLkHATmwCddC
fetchai/connections/p2p_libp2p,QmT5sbnNb4mz3EYbRJBatcGEPht8s2QWGsG98unVSfWuhJ
fetchai/connections/p2p_libp2p_client,QmY4vR6r4XqqWw25Q3bTmPcXMcaVAkAs3RJjEWyVEe81kv
fetchai/connections/p2p_stub,QmSBRr26YELdbYk9nAurw3XdQ3Myj7cVgCDZZMv7DMrsdg
fetchai/connections/scaffold,QmTzEeEydjohZNTsAJnoGMtzTgCyzMBQCYgbTBLfqWtw5w
fetchai/connections/soef,QmcXsmtNeio1CPnZbZFvQLuvfQyZhX5K8mP624cB66DnjF
Expand Down
8 changes: 4 additions & 4 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ def _make_libp2p_connection(
identity = Identity("", address=FetchAICrypto().address)
if relay and delegate:
configuration = ConnectionConfig(
libp2p_key_file=None,
node_key_file=None,
local_uri="{}:{}".format(host, port),
public_uri="{}:{}".format(host, port),
entry_peers=entry_peers,
Expand All @@ -742,7 +742,7 @@ def _make_libp2p_connection(
)
elif relay and not delegate:
configuration = ConnectionConfig(
libp2p_key_file=None,
node_key_file=None,
local_uri="{}:{}".format(host, port),
public_uri="{}:{}".format(host, port),
entry_peers=entry_peers,
Expand All @@ -751,7 +751,7 @@ def _make_libp2p_connection(
)
else:
configuration = ConnectionConfig(
libp2p_key_file=None,
node_key_file=None,
local_uri="{}:{}".format(host, port),
entry_peers=entry_peers,
log_file=log_file,
Expand All @@ -761,7 +761,7 @@ def _make_libp2p_connection(


def _make_libp2p_client_connection(
node_port: int = 11234, node_host: str = "127.0.0.1",
node_port: int = 11234, node_host: str = "127.0.0.1"
) -> P2PLibp2pClientConnection:
identity = Identity("", address=FetchAICrypto().address)
configuration = ConnectionConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
from aea.multiplexer import Multiplexer
from aea.protocols.default.message import DefaultMessage

from packages.fetchai.connections.p2p_libp2p.connection import Uri

from tests.conftest import (
_make_libp2p_connection,
libp2p_log_on_failure,
Expand Down Expand Up @@ -508,3 +510,10 @@ def teardown_class(cls):
shutil.rmtree(cls.t)
except (OSError, IOError):
pass


@skip_test_windows
def test_libp2pconnection_uri():
uri = Uri(host="127.0.0.1")
uri = Uri(host="127.0.0.1", port=10000)
assert uri.host == "127.0.0.1" and uri.port == 10000
Loading