Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase libp2ps connections code coverage to 100% #1497

Merged
merged 7 commits into from
Jul 14, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 21 additions & 24 deletions packages/fetchai/connections/p2p_libp2p/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@


async def _golang_module_build_async(
path: str, log_file_desc: IO[str], loop: Optional[asyncio.AbstractEventLoop] = None
path: str,
log_file_desc: IO[str],
loop: Optional[asyncio.AbstractEventLoop] = None,
timeout: Optional[float] = LIBP2P_NODE_DEPS_DOWNLOAD_TIMEOUT,
lrahmani marked this conversation as resolved.
Show resolved Hide resolved
) -> int:
"""
Builds go module located at `path`, downloads necessary dependencies
Expand All @@ -80,9 +83,7 @@ async def _golang_module_build_async(
golang_build = asyncio.ensure_future(proc.start())

try:
returncode = await asyncio.wait_for(
golang_build, LIBP2P_NODE_DEPS_DOWNLOAD_TIMEOUT
)
returncode = await asyncio.wait_for(golang_build, timeout)
lrahmani marked this conversation as resolved.
Show resolved Hide resolved
except asyncio.TimeoutError:
e = Exception(
"Failed to download libp2p dependencies within timeout({})".format(
Expand Down Expand Up @@ -186,7 +187,7 @@ def __init__(
def __str__(self):
return "{}:{}".format(self._host, self._port)

def __repr__(self):
def __repr__(self): # pragma: no cover
return self.__str__()

@property
Expand Down Expand Up @@ -293,10 +294,6 @@ async def start(self) -> None:

:return: None
"""
which = shutil.which("go")
if which is None:
raise Exception("Libp2p Go should me installed")
Comment on lines -296 to -298
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why has this been removed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because connection already does the check (using _check_go_installed())


if self._loop is None:
self._loop = asyncio.get_event_loop()

Expand Down Expand Up @@ -325,9 +322,9 @@ async def start(self) -> None:
out_path = self.aea_to_libp2p_path
logger.debug("Creating pipes ({}, {})...".format(in_path, out_path))
if os.path.exists(in_path):
os.remove(in_path)
os.remove(in_path) # pragma: no cover
if os.path.exists(out_path):
os.remove(out_path)
os.remove(out_path) # pragma: no cover
os.mkfifo(in_path)
os.mkfifo(out_path)

Expand Down Expand Up @@ -407,7 +404,7 @@ async def _connect(self) -> None:
await self._connect()
return
else:
raise e
raise e # pragma: no cover

# setup reader
assert (
Expand Down Expand Up @@ -450,11 +447,11 @@ async def read(self) -> Optional[bytes]:
try:
logger.debug("Waiting for messages...")
buf = await self._stream_reader.readexactly(4)
if not buf:
if not buf: # pragma: no cover
return None
size = struct.unpack("!I", buf)[0]
data = await self._stream_reader.readexactly(size)
if not data:
if not data: # pragma: no cover
return None
return data
except asyncio.streams.IncompleteReadError as e:
Expand Down Expand Up @@ -510,7 +507,7 @@ def stop(self) -> None:
)
self.proc.wait()
else:
logger.debug("Called stop when process not set!")
logger.debug("Called stop when process not set!") # pragma: no cover
if os.path.exists(LIBP2P_NODE_ENV_FILE):
os.remove(LIBP2P_NODE_ENV_FILE)

Expand Down Expand Up @@ -544,7 +541,7 @@ def __init__(self, **kwargs):
if (
self.has_crypto_store
and self.crypto_store.crypto_objects.get("fetchai", None) is not None
):
): # pragma: no cover
key = cast(FetchAICrypto, self.crypto_store.crypto_objects["fetchai"])
elif libp2p_key_file is not None:
key = FetchAICrypto(libp2p_key_file)
Expand Down Expand Up @@ -576,7 +573,7 @@ def __init__(self, **kwargs):
raise ValueError(
"At least one Entry Peer should be provided when node can not be publically reachable"
)
if delegate_uri is not None:
if delegate_uri is not None: # pragma: no cover
logger.warning(
"Ignoring Delegate Uri configuration as node can not be publically reachable"
)
Expand Down Expand Up @@ -610,12 +607,12 @@ def __init__(self, **kwargs):
self._receive_from_node_task = None # type: Optional[asyncio.Future]

@property
def libp2p_address(self) -> str:
def libp2p_address(self) -> str: # pragma: no cover
"""The address used by the node."""
return self.node.pub

@property
def libp2p_address_id(self) -> str:
def libp2p_address_id(self) -> str: # pragma: no cover
"""The identifier for the address."""
return LIBP2P

Expand All @@ -625,7 +622,7 @@ async def connect(self) -> None:

:return: None
"""
if self.connection_status.is_connected:
if self.connection_status.is_connected: # pragma: no cover
return
try:
# start libp2p node
Expand Down Expand Up @@ -682,7 +679,7 @@ async def receive(self, *args, **kwargs) -> Optional["Envelope"]:
# TOFIX(LR) attempt restarting the node?
logger.debug("Received data: {}".format(data))
return Envelope.decode(data)
except CancelledError:
except CancelledError: # pragma: no cover
logger.debug("Receive cancelled.")
return None
except Exception as e: # pragma: nocover # pylint: disable=broad-except
Expand All @@ -705,17 +702,17 @@ async def _receive_from_node(self) -> None:
"""
while True:
data = await self.node.read()
if data is None:
break
assert self._in_queue is not None, "Input queue not initialized."
self._in_queue.put_nowait(data)
if data is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why this was moved here; data is already used in line 706

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now you might be putting None on the queue

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, exactly.
The _receive_from_node task will exit and the multiplexer will receive None on its scheduled receive

break

@staticmethod
def _check_go_installed() -> None:
"""Checks if go is installed. Sys.exits if not"""
res = shutil.which("go")
if res is None:
raise AEAException(
raise AEAException( # pragma: nocover
"Please install go before running the `fetchai/p2p_libp2p:0.1.0` connection. "
"Go is available for download here: https://golang.org/doc/install"
)
2 changes: 1 addition & 1 deletion packages/fetchai/connections/p2p_libp2p/connection.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ fingerprint:
aea/api.go: QmW5fUpVZmV3pxgoakm3RvsvCGC6FwT2XprcqXHM8rBXP5
aea/envelope.pb.go: QmRfUNGpCeVJfsW3H1MzCN4pwDWgumfyWufVFp6xvUjjug
aea/envelope.proto: QmSC8EGCKiNFR2vf5bSWymSzYDFMipQW9aQVMwPzQoKb4n
connection.py: QmPJhRWhULLNMb1uxexfERUQvntN3FEBsgN5ELv38oDwA4
connection.py: Qma9S5jWdTzuz93Y7WQsABai9xPGchfvnWoz2u5RRXwYU9
dht/dhtclient/dhtclient.go: QmNnU1pVCUtj8zJ1Pz5eMk9sznsjPFSJ9qDkzbrNwzEecV
dht/dhtclient/dhtclient_test.go: QmPfnHSHXtbaW5VYuq1QsKQWey64pUEvLEaKKkT9eAcmws
dht/dhtclient/options.go: QmPorj38wNrxGrzsbFe5wwLmiHzxbTJ2VsgvSd8tLDYS8s
Expand Down
2 changes: 1 addition & 1 deletion packages/hashes.csv
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ fetchai/connections/ledger,QmVXceMJCioA1Hro9aJgBwrF9yLgToaVXifDz6EVo6vTXn
fetchai/connections/local,QmVcTEJxGbWbtXi2fLN5eJA6XuEAneaNd83UJPugrtb9xU
fetchai/connections/oef,QmdkQ9hUbJ8HsJD5qxSPRae9s2G9LZXFhfJabeHBVVYMJi
fetchai/connections/p2p_client,QmbwCDuAB1eq6JikqeAAqpqjVhxevGNeWCLqRD67Uvqiaz
fetchai/connections/p2p_libp2p,QmPriLUrxDUQSfuE42rdEvGv5EeAT9dLGBtBD6vB5Kf147
fetchai/connections/p2p_libp2p,QmZM7xR551DCLhF37VemXyk8sRoRhQAS9GXJe3CvT2foi8
fetchai/connections/p2p_libp2p_client,QmVhsh863k3ws4HeDpkZm7GQkrW3aMREu5sLkHATmwCddC
fetchai/connections/p2p_stub,QmSBRr26YELdbYk9nAurw3XdQ3Myj7cVgCDZZMv7DMrsdg
fetchai/connections/scaffold,QmTzEeEydjohZNTsAJnoGMtzTgCyzMBQCYgbTBLfqWtw5w
Expand Down
6 changes: 3 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ def _make_libp2p_connection(
identity = Identity("", address=FetchAICrypto().address)
if relay and delegate:
configuration = ConnectionConfig(
libp2p_key_file=None,
node_key_file=None,
local_uri="{}:{}".format(host, port),
public_uri="{}:{}".format(host, port),
entry_peers=entry_peers,
Expand All @@ -742,7 +742,7 @@ def _make_libp2p_connection(
)
elif relay and not delegate:
configuration = ConnectionConfig(
libp2p_key_file=None,
node_key_file=None,
local_uri="{}:{}".format(host, port),
public_uri="{}:{}".format(host, port),
entry_peers=entry_peers,
Expand All @@ -751,7 +751,7 @@ def _make_libp2p_connection(
)
else:
configuration = ConnectionConfig(
libp2p_key_file=None,
node_key_file=None,
local_uri="{}:{}".format(host, port),
entry_peers=entry_peers,
log_file=log_file,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
from aea.multiplexer import Multiplexer
from aea.protocols.default.message import DefaultMessage

from packages.fetchai.connections.p2p_libp2p.connection import Uri

from tests.conftest import (
_make_libp2p_connection,
libp2p_log_on_failure,
Expand Down Expand Up @@ -508,3 +510,10 @@ def teardown_class(cls):
shutil.rmtree(cls.t)
except (OSError, IOError):
pass


@skip_test_windows
def test_libp2pconnection_uri():
uri = Uri(host="127.0.0.1")
uri = Uri(host="127.0.0.1", port=10000)
assert uri.host == "127.0.0.1" and uri.port == 10000
Loading