forked from sonic-net/sonic-buildimage
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement ipCidrRouteDest oid (only default routes) (sonic-net#4)
* Implement ipCidrRouteDest oid (default routes) * (doc) * Fix test cases * Clean commented code, refine code on default route missing case, fix MockRedis
- Loading branch information
1 parent
fabc441
commit 5c30a36
Showing
9 changed files
with
210 additions
and
14 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
import json | ||
import ipaddress | ||
from enum import unique, Enum | ||
|
||
from sonic_ax_impl import mibs | ||
from ax_interface import MIBMeta, ValueType, MIBUpdater, ContextualMIBEntry, SubtreeMIBEntry | ||
from ax_interface.encodings import OctetString | ||
from ax_interface.util import mac_decimals | ||
from bisect import bisect_right | ||
|
||
def ip2tuple(ip): | ||
return tuple(int(bs) for bs in str(ip).split('.')) | ||
|
||
class RouteUpdater(MIBUpdater): | ||
def __init__(self): | ||
super().__init__() | ||
self.db_conn, _, _, _, _, _ = mibs.init_sync_d_interface_tables() | ||
# call our update method once to "seed" data before the "Agent" starts accepting requests. | ||
self.update_data() | ||
|
||
def update_data(self): | ||
""" | ||
Update redis (caches config) | ||
Pulls the table references for each interface. | ||
""" | ||
self.route_dest_map = {} | ||
self.route_dest_list = [] | ||
|
||
self.db_conn.connect(mibs.APPL_DB) | ||
route_entries = self.db_conn.keys(mibs.APPL_DB, "ROUTE_TABLE:*") | ||
|
||
for route_entry in route_entries: | ||
routestr = route_entry.decode() | ||
ipnstr = routestr[len("ROUTE_TABLE:"):] | ||
if ipnstr == "0.0.0.0/0": | ||
ipn = ipaddress.ip_network(ipnstr) | ||
ent = self.db_conn.get_all(mibs.APPL_DB, routestr, blocking=True) | ||
nexthops = ent[b"nexthop"].decode() | ||
for nh in nexthops.split(','): | ||
sub_id = ip2tuple(ipn.network_address) + ip2tuple(ipn.netmask) + ip2tuple(nh) | ||
self.route_dest_list.append(sub_id) | ||
self.route_dest_map[sub_id] = ipn.network_address.packed | ||
|
||
self.route_dest_list.sort() | ||
|
||
def route_dest(self, sub_id): | ||
return self.route_dest_map.get(sub_id, None) | ||
|
||
def get_next(self, sub_id): | ||
right = bisect_right(self.route_dest_list, sub_id) | ||
if right >= len(self.route_dest_list): | ||
return None | ||
|
||
return self.route_dest_list[right] | ||
|
||
class IpCidrRouteTable(metaclass=MIBMeta, prefix='.1.3.6.1.2.1.4.24.4'): | ||
""" | ||
'ipCidrRouteDest table in IP Forwarding Table MIB' https://tools.ietf.org/html/rfc4292 | ||
""" | ||
|
||
route_updater = RouteUpdater() | ||
|
||
ipCidrRouteDest = \ | ||
SubtreeMIBEntry('1.1', route_updater, ValueType.IP_ADDRESS, route_updater.route_dest) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
import os | ||
import sys | ||
import ipaddress | ||
|
||
modules_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | ||
sys.path.insert(0, os.path.join(modules_path, 'src')) | ||
|
||
from unittest import TestCase | ||
|
||
import tests.mock_tables.dbconnector | ||
|
||
from ax_interface.mib import MIBTable | ||
from ax_interface.pdu import PDUHeader | ||
from ax_interface.pdu_implementations import GetPDU, GetNextPDU | ||
from ax_interface import ValueType | ||
from ax_interface.encodings import ObjectIdentifier | ||
from ax_interface.constants import PduTypes | ||
from sonic_ax_impl.mibs.ietf import rfc4363 | ||
from sonic_ax_impl.main import SonicMIB | ||
|
||
class TestForwardMIB(TestCase): | ||
@classmethod | ||
def setUpClass(cls): | ||
cls.lut = MIBTable(SonicMIB) | ||
|
||
def test_network_order(self): | ||
ip = ipaddress.ip_address("0.1.2.3") | ||
ipb = ip.packed | ||
ips = ".".join(str(int(x)) for x in list(ipb)) | ||
self.assertEqual(ips, "0.1.2.3") | ||
|
||
def test_getpdu(self): | ||
oid = ObjectIdentifier(23, 0, 1, 0, (1, 3, 6, 1, 2, 1, 4, 24, 4, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 10, 0, 0, 15)) | ||
get_pdu = GetPDU( | ||
header=PDUHeader(1, PduTypes.GET, 16, 0, 42, 0, 0, 0), | ||
oids=[oid] | ||
) | ||
|
||
encoded = get_pdu.encode() | ||
response = get_pdu.make_response(self.lut) | ||
print(response) | ||
|
||
value0 = response.values[0] | ||
self.assertEqual(value0.type_, ValueType.IP_ADDRESS) | ||
self.assertEqual(str(value0.name), str(oid)) | ||
self.assertEqual(str(value0.data), ipaddress.ip_address("0.0.0.0").packed.decode()) | ||
|
||
def test_getnextpdu(self): | ||
get_pdu = GetNextPDU( | ||
header=PDUHeader(1, PduTypes.GET, 16, 0, 42, 0, 0, 0), | ||
oids=( | ||
ObjectIdentifier(20, 0, 0, 0, (1, 3, 6, 1, 2, 1, 4, 24, 4, 1, 1)), | ||
) | ||
) | ||
|
||
encoded = get_pdu.encode() | ||
response = get_pdu.make_response(self.lut) | ||
print(response) | ||
|
||
n = len(response.values) | ||
value0 = response.values[0] | ||
self.assertEqual(value0.type_, ValueType.IP_ADDRESS) | ||
self.assertEqual(str(value0.data), ipaddress.ip_address("0.0.0.0").packed.decode()) | ||
|
||
def test_getnextpdu_exactmatch(self): | ||
oid = ObjectIdentifier(23, 0, 1, 0, (1, 3, 6, 1, 2, 1, 4, 24, 4, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 10, 0, 0, 17)) | ||
get_pdu = GetNextPDU( | ||
header=PDUHeader(1, PduTypes.GET, 16, 0, 42, 0, 0, 0), | ||
oids=[oid] | ||
) | ||
|
||
encoded = get_pdu.encode() | ||
response = get_pdu.make_response(self.lut) | ||
print(response) | ||
|
||
n = len(response.values) | ||
value0 = response.values[0] | ||
self.assertEqual(value0.type_, ValueType.IP_ADDRESS) | ||
print("test_getnextpdu_exactmatch: ", str(oid)) | ||
self.assertEqual(str(value0.name), str(oid)) | ||
self.assertEqual(str(value0.data), ipaddress.ip_address("0.0.0.0").packed.decode()) | ||
|
||
def test_getpdu_noinstance(self): | ||
get_pdu = GetPDU( | ||
header=PDUHeader(1, PduTypes.GET, 16, 0, 42, 0, 0, 0), | ||
oids=( | ||
ObjectIdentifier(19, 0, 0, 0, (1, 3, 6, 1, 2, 1, 4, 24, 4, 1, 1, 0, 0, 0, 0, 0, 0, 0, 1)), | ||
) | ||
) | ||
|
||
encoded = get_pdu.encode() | ||
response = get_pdu.make_response(self.lut) | ||
print(response) | ||
|
||
n = len(response.values) | ||
value0 = response.values[0] | ||
self.assertEqual(value0.type_, ValueType.NO_SUCH_INSTANCE) | ||
|
||
def test_getnextpdu_empty(self): | ||
get_pdu = GetNextPDU( | ||
header=PDUHeader(1, PduTypes.GET, 16, 0, 42, 0, 0, 0), | ||
oids=( | ||
ObjectIdentifier(11, 0, 0, 0, (1, 3, 6, 1, 2, 1, 4, 24, 4, 1, 2)), | ||
) | ||
) | ||
|
||
encoded = get_pdu.encode() | ||
response = get_pdu.make_response(self.lut) | ||
print(response) | ||
|
||
n = len(response.values) | ||
value0 = response.values[0] | ||
self.assertEqual(value0.type_, ValueType.END_OF_MIB_VIEW) | ||
|