Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

riotctrl_shell: provide netif interactions and parsers #14441

Merged
merged 3 commits into from
Jul 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
382 changes: 382 additions & 0 deletions dist/pythonlibs/riotctrl_shell/netif.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,382 @@
# Copyright (C) 2020 Freie Universität Berlin
#
# This file is subject to the terms and conditions of the GNU Lesser
# General Public License v2.1. See the file LICENSE in the top level
# directory for more details.

"""
Shell interactions related to network interfaces

Defines shell command interactions related to network interfaces
"""

import re

from riotctrl.shell import ShellInteraction, ShellInteractionParser


# ==== Parsers ====

class IfconfigListParser(ShellInteractionParser):
def __init__(self):
self.iface_c = re.compile(r"Iface\s+(?P<name>\S+)\s")
# option values are repetitions of at least one non white space
# separated by at most one whitespace
# e.g. for MCS: 1 (BPSK, rate 1/2, 2x frequency repetition) MTU :1280
# "1 (BPSK, rate 1/2, 2x frequency repetition)" belongs to the option
# value, "MTU" does not
self.option_c = re.compile(r"^(?P<option>[^:]+):\s?"
r"(?P<value>\S+(\s\S+)*)$")
# options are evaluated before flags, so all options that don't contain
# colons are flags
self.flag_c = re.compile(r"^(?P<flag>[^:]+)$")
self.ipv6_c = re.compile(r"inet6 (?P<type>addr|group): "
r"(?P<addr>[0-9a-f:]+)(\s+"
r"scope:\s+(?P<scope>\S+)"
r"(?P<anycast>\s+\[anycast\])?\s+"
r"(?P<state>\S+))?$")
self.bl_header_c = re.compile(r"(?P<mode>White|Black)-listed "
r"link layer addresses:")
self.bl_c = re.compile(r"\d+: (?P<addr>[0-9a-f]{2}(:[0-9a-f]{2})*)$")

def parse(self, cmd_output):
"""
Parses output of Ifconfig::ifconfig_list()

See tests (tests/test_netif_list_parse.py) for further possible items:

>>> parser = IfconfigListParser()
>>> res = parser.parse("Iface WP_01 HWAddr: ab:cd 6LO PROMISC\\n"
... "Iface ET_01 HWaddr: 01:23:45:67:89:AB\\n")
>>> len(res)
2
>>> res["WP_01"]["hwaddr"]
'ab:cd'
>>> sorted(res["WP_01"]["flags"])
['6LO', 'PROMISC']
>>> res["ET_01"]["hwaddr"]
'01:23:45:67:89:AB'
"""
netifs = None
current = None
parse_ipv6 = False
parse_blacklist = False
stats_parser = IfconfigStatsParser()
offset = 0
for line in cmd_output.splitlines():
m = self.iface_c.search(line)
if m is not None:
name = m.group("name")
if netifs is None:
netifs = {}
current = netifs[name] = {}
# Go ahead in line to not confuse options parser
line = line[m.end():]
offset += m.end() + 1
if current is not None:
# XXX checking for IPv4 address might also go here
if "ipv6_addrs" not in current:
parse_ipv6 = self.ipv6_c.search(line) is not None
if not parse_ipv6 and not parse_blacklist:
self._parse_netif_option(current, line)
if parse_ipv6:
parse_ipv6 = self._parse_ipv6(current, line)
elif parse_blacklist:
m = self.bl_c.search(line)
if m is not None:
if "blacklist" in current:
current["blacklist"].append(m.group("addr"))
else:
current["whitelist"].append(m.group("addr"))
else:
parse_blacklist = False
elif not parse_blacklist and \
"blacklist" not in current and \
"whitelist" not in current:
m = self.bl_header_c.search(line)
if m is not None:
if m.group("mode") == "Black":
current["blacklist"] = []
else:
current["whitelist"] = []
parse_blacklist = True
m = stats_parser.header_c.search(line)
if m is not None:
stats = stats_parser.parse(cmd_output[offset:])
if stats is not None:
current["stats"] = stats
# assume stats to be always last
break
offset += len(line)
return netifs

@staticmethod
def _snake_case(option):
"""
Converts all option names parsed by _parse_netif_option() to snake_case

>>> IfconfigListParser._snake_case("Max. Retrans.")
'max_retrans'
"""
return re.sub(r"\W+", "_", option.strip().lower()).strip("_")

@staticmethod
def _convert_value(value_str):
"""
Tries to converts an option value parsed by _parse_netif_option() to
int or float if possible

>>> IfconfigListParser._convert_value("12345")
12345
>>> IfconfigListParser._convert_value("0xf")
15
>>> IfconfigListParser._convert_value("3.14")
3.14
>>> IfconfigListParser._convert_value("3.14ispi")
'3.14ispi'
"""
try:
# try to convert to int
return int(value_str)
except ValueError:
try:
# next try to convert to int from hex
if value_str.startswith("0x"):
return int(value_str[2:], base=16)
else:
return int(value_str, base=16)
except ValueError:
try:
# next try to convert to float
return float(value_str)
except ValueError:
# lastly just use the string
return value_str

def _parse_netif_option(self, netif, line):
"""Parses all the options found before the IP address listing"""
# remove potential content before line like logging tag, date etc.
line = line.split(" ")[-1]
# options and flags are separated by two spaces
for token in line.strip().split(" "):
# ensure there are no whitespaces at start or end => output bug
assert token == token.strip()
m = self.option_c.search(token)
if m is not None:
option = self._snake_case(m.group("option"))
value_str = m.group("value").strip()
netif[option] = self._convert_value(value_str)
m = self.flag_c.search(token)
if m is not None:
flag = m.group("flag")
if "flags" in netif:
netif["flags"].append(flag)
else:
netif["flags"] = [flag]

def _parse_ipv6(self, netif, line):
"""
Parses IPv6 unicast, anycast, and multicast addresses
"""
m = self.ipv6_c.search(line)
if m is not None:
addr = m.groupdict()
typ = addr.pop("type")
if typ == "addr": # unicast address
# reformat anycast item if existent
if addr.get("anycast") is None:
addr.pop("anycast", None)
else:
addr["anycast"] = True
if "ipv6_addrs" in netif:
netif["ipv6_addrs"].append(addr)
else:
netif["ipv6_addrs"] = [addr]
else: # multicast address
for key in set(addr):
# remove empty matches
if addr[key] is None:
del addr[key]
if "ipv6_groups" in netif:
netif["ipv6_groups"].append(addr)
else:
netif["ipv6_groups"] = [addr]
return True
return False


class IfconfigStatsParser(ShellInteractionParser):
def __init__(self):
self.header_c = re.compile(r"Statistics for (?P<module>.+)$")
self.rx_c = re.compile(r"RX packets\s+(?P<packets>\d+)\s+"
r"bytes\s+(?P<bytes>\d+)$")
self.tx_c = re.compile(r"TX packets\s+(?P<packets>\d+)\s+"
r"\(Multicast:\s+(?P<multicast>\d+)\)\s+"
r"bytes\s+(?P<bytes>\d+)$")
self.tx_err_c = re.compile(r"TX succeeded\s+(?P<succeeded>\d+)\s+"
r"errors\s+(?P<errors>\d+)$")

def parse(self, cmd_output):
"""
Parses output of Ifconfig::ifconfig_stats or the statistics part of
Ifconfig::ifconfig_list.

:param cmd_output(str): output of Ifconfig::ifconfig_stats or
Ifconfig::ifconfig_list
:return: dictionary with one entry per statistics module, each module
containing an entry 'rx' and 'tx' with respective statistics

>>> parser = IfconfigStatsParser()
>>> res = parser.parse(
... "Statistics for IPv6\\n"
... " RX packets 14 bytes 1104\\n"
... " TX packets 3 (Multicast: 1) bytes 192\\n"
... " TX succeeded 3 errors 0\\n")
>>> sorted(res)
['IPv6']
>>> sorted(res["IPv6"])
['rx', 'tx']
>>> sorted(res["IPv6"]["rx"])
['bytes', 'packets']
>>> sorted(res["IPv6"]["tx"])
['bytes', 'errors', 'multicast', 'packets', 'succeeded']
>>> res["IPv6"]["rx"]["bytes"]
1104
"""
stats = None
current = None
for line in cmd_output.splitlines():
line = line.strip()
m = self.header_c.search(line)
if m is not None:
module = m.group("module")
if stats is None:
stats = {}
current = stats[module] = {}
if current is not None:
if "rx" not in current:
m = self.rx_c.search(line)
if m is not None:
current["rx"] = {k: int(v)
for k, v in m.groupdict().items()}
elif "tx" not in current:
m = self.tx_c.search(line)
if m is not None:
current["tx"] = {k: int(v)
for k, v in m.groupdict().items()}
elif "tx" in current:
m = self.tx_err_c.search(line)
if m is not None:
current["tx"].update(
{k: int(v) for k, v in m.groupdict().items()}
)
return stats


# ==== ShellInteractions ====

class Ifconfig(ShellInteraction):
def ifconfig_list(self, netif=None, timeout=-1, async_=False):
return self.ifconfig_cmd(netif=netif, timeout=timeout, async_=async_)

def ifconfig_cmd(self, netif=None, args=None, timeout=-1, async_=False):
cmd = "ifconfig"
if netif is not None:
cmd += " {netif}".format(netif=netif)
if args is not None:
if netif is None:
raise ValueError("netif required when args are provided")
cmd += " {args}".format(args=" ".join(args))
return self.cmd(cmd, timeout=timeout, async_=False)

def ifconfig_help(self, netif, timeout=-1, async_=False):
return self.ifconfig_cmd(netif=netif, args=("help",),
timeout=timeout, async_=async_)

def ifconfig_set(self, netif, key, value, timeout=-1, async_=False):
return self._ifconfig_success_cmd(netif=netif,
args=("set", key, value),
timeout=timeout, async_=async_)

def ifconfig_up(self, netif, timeout=-1, async_=False):
self._ifconfig_error_cmd(netif=netif, args=("up",),
timeout=timeout, async_=async_)

def ifconfig_down(self, netif, timeout=-1, async_=False):
self._ifconfig_error_cmd(netif=netif, args=("down",),
timeout=timeout, async_=async_)

def ifconfig_add(self, netif, addr, anycast=False,
timeout=-1, async_=False):
args = ["add", addr]
if anycast:
args.append("anycast")
return self._ifconfig_success_cmd(netif=netif, args=args,
timeout=timeout, async_=async_)

def ifconfig_del(self, netif, addr, timeout=-1, async_=False):
return self._ifconfig_success_cmd(netif=netif, args=("del", addr),
timeout=timeout, async_=async_)

def ifconfig_flag(self, netif, flag, enable=True,
timeout=-1, async_=False):
return self._ifconfig_success_cmd(
netif=netif, args=("{}{}".format("" if enable else "-", flag),),
timeout=timeout, async_=async_
)

def ifconfig_l2filter_add(self, netif, addr, timeout=-1, async_=False):
return self._ifconfig_success_cmd(
netif=netif, args=("l2filter", "add", addr),
timeout=timeout, async_=async_
)

def ifconfig_l2filter_del(self, netif, addr, timeout=-1, async_=False):
return self._ifconfig_success_cmd(
netif=netif, args=("l2filter", "del", addr),
timeout=timeout, async_=async_
)

def ifconfig_stats(self, netif, module, timeout=-1, async_=False):
res = self.ifconfig_cmd(netif=netif, args=("stats", module),
timeout=timeout, async_=async_)
if "Statistics for " in res:
return res
raise RuntimeError(res)

def ifconfig_stats_reset(self, netif, module, timeout=-1, async_=False):
res = self.ifconfig_cmd(netif=netif, args=("stats", module, "reset"),
timeout=timeout, async_=async_)
if "Reset statistics for module " in res:
return res
raise RuntimeError(res)

def _ifconfig_success_cmd(self, netif=None, args=None,
timeout=-1, async_=False):
"""For commands that have a success output"""
res = self.ifconfig_cmd(netif=netif, args=args,
timeout=timeout, async_=async_)
if "success" in res:
return res
raise RuntimeError(res)

def _ifconfig_error_cmd(self, netif=None, args=None,
timeout=-1, async_=False):
"""For commands that only have an error output"""
res = self.ifconfig_cmd(netif=netif, args=args,
timeout=timeout, async_=async_)
if "error" in res:
raise RuntimeError(res)


class TXTSnd(ShellInteraction):
def netif_txtsnd(self, netif, target, data, timeout=-1, async_=False):
cmd = "txtsnd {netif} {target} {data}".format(
netif=netif,
target=target,
data=data
)
res = self.cmd(cmd)
if "error" in res or "usage" in res:
raise RuntimeError(res)
return res
Loading