forked from Mirror/frr

Test Scenario: RouterA and RouterB are in the same routing domain and have configured a P2P link. RouterA is configured with "is-type level-1" while RouterB is configured with "is-type level-1-2". They establish a level-1 UP neighborship. In this scenario, we expect that when RouterB's configuration is switched to "is-type level-2-only", the neighborship status on both RouterA and RouterB would be non-UP. However, RouterB still shows the neighbor as UP. Upon receiving a P2P Hello packet, the function "process_p2p_hello" is invoked. According to the ISO/IEC 10589 protocol specification, section 8.2.5.2 a) and tables 5 and 7, if the "iih->circ_type" of the neighbor's hello packet does not match one's own "circuit->is_type," we may choose to take no action. When establishing a neighborship for the first time, the neighbor's status can remain in the "Initializing" state. However, if the neighborship has already been established and one's own "circuit->is_type" changes, the neighbor's UP status cannot be reset. Therefore, when processing P2P Hello packets, we should be cognizant of changes in our own link adjacency type. Topotest has identified a core issue during testing. (gdb) bt "#0 0xb7efe579 in __kernel_vsyscall () \#1 0xb79f62f7 in ?? () \#2 0xbf981dd0 in ?? () \#3 <signal handler called> \#4 0xb79f7722 in ?? () \#5 0xb7ed8634 in _DYNAMIC () from /home/z15467/isis_core/usr/lib/i386-linux-gnu/frr/libfrr.so.0.0.0 \#6 0x0001003c in ?? () \#7 0x00010000 in ?? () \#8 0xb7df3322 in _frr_mtx_lock (mutex=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>) at ../lib/frr_pthread.h:255 \#9 event_timer_remain_msec (thread=0x10000) at ../lib/event.c:734 \#10 event_timer_remain_msec (thread=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>) at ../lib/event.c:727 \#11 0x004fb4aa in _send_hello_sched (circuit=<optimized out>, threadp=0x2189de0, level=1, delay=<optimized out>) at ../isisd/isis_pdu.c:2116 \#12 0x004e8dbc in isis_circuit_up (circuit=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>) at ../isisd/isis_circuit.c:734 \#13 0x004ea8f7 in isis_csm_state_change (event=<optimized out>, circuit=<optimized out>, arg=<optimized out>) at ../isisd/isis_csm.c:98 \#14 0x004ea23f in isis_circuit_circ_type_set (circuit=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>, circ_type=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>) at ../isisd/isis_circuit.c:1578 \#15 0x0053aefa in lib_interface_isis_network_type_modify (args=<optimized out>) at ../isisd/isis_nb_config.c:4190 \#16 0xb7dbcc8d in nb_callback_modify (errmsg_len=8192, errmsg=0xbf982afc "", resource=0x2186220, dnode=<optimized out>, event=NB_EV_APPLY, nb_node=0x1fafe70, context=<optimized out>) at ../lib/northbound.c:1550 \#17 nb_callback_configuration (context=<optimized out>, event=NB_EV_APPLY, change=<optimized out>, errmsg=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>, errmsg_len=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>) at ../lib/northbound.c:1900 \#18 0xb7dbd646 in nb_transaction_process (errmsg_len=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>, errmsg=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>, transaction=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>, event=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>) at ../lib/northbound.c:2028 \#19 nb_candidate_commit_apply (transaction=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>, save_transaction=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>, transaction_id=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>, errmsg=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>, errmsg_len=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>) at ../lib/northbound.c:1368 \#20 0xb7dbdd68 in nb_candidate_commit (context=..., candidate=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>, save_transaction=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>, comment=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>, transaction_id=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>, errmsg=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>, errmsg_len=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>) at ../lib/northbound.c:1401 \#21 0xb7dc0cff in nb_cli_classic_commit (vty=vty@entry=0x21d6940) at ../lib/northbound_cli.c:57 \#22 0xb7dc0f46 in nb_cli_apply_changes_internal (vty=vty@entry=0x21d6940, xpath_base=xpath_base@entry=0xbf986b7c "/frr-interface:lib/interface[name='r5-eth0']", clear_pending=clear_pending@entry=false) at ../lib/northbound_cli.c:184 \#23 0xb7dc130b in nb_cli_apply_changes (vty=<optimized out>, xpath_base_fmt=<optimized out>) at ../lib/northbound_cli.c:240 \#24 0x00542c1d in isis_network_magic (self=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>, argc=<optimized out>, argv=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>, no=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>, vty=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>) at ../isisd/isis_cli.c:3101 \#25 isis_network (self=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>, vty=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>, argc=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>, argv=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>) at ./isisd/isis_cli_clippy.c:5499 \#26 0xb7d6d8f1 in cmd_execute_command_real (vline=vline@entry=0x219afa0, vty=vty@entry=0x21d6940, cmd=cmd@entry=0x0, up_level=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>) at ../lib/command.c:1003 \#27 0xb7d6d9e0 in cmd_execute_command (vline=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>, vty=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>, cmd=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>, vtysh=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>) at ../lib/command.c:1061 \#28 0xb7d6dc60 in cmd_execute (vty=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>, cmd=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>, matched=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>, vtysh=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>) at ../lib/command.c:1228 \#29 0xb7dfb58a in vty_command (vty=vty@entry=0x21d6940, buf=0x21e0ff0 ' ' <repeats 12 times>, "isis network point-to-point") at ../lib/vty.c:625 \#30 0xb7dfc560 in vty_execute (vty=vty@entry=0x21d6940) at ../lib/vty.c:1388 \#31 0xb7dfdc8d in vtysh_read (thread=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>) at ../lib/vty.c:2400 \#32 0xb7df4d47 in event_call (thread=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>) at ../lib/event.c:2019 \#33 0xb7d9a831 in frr_run (master=<optimized out>) at ../lib/libfrr.c:1232 \#34 0x004e4758 in main (argc=7, argv=0xbf989a24, envp=0xbf989a44) at ../isisd/isis_main.c:354 (gdb) f 9 \#9 event_timer_remain_msec (thread=0x10000) at ../lib/event.c:734 734 ../lib/event.c: No such file or directory. (gdb) p pthread No symbol "pthread" in current context. (gdb) p thread $1 = (struct event *) 0x10000 When LAN links and P2P links share the` circuit->u` of a neighbor, if one link is no longer in use and the union is not cleared, the other link is unable to pass the non-empty check, resulting in accessing an invalid pointer. Unfortunately, for non-DIS devices in LAN links, `circuit->u.bc.run_dr_elect[x]` is essentially always 1, but in `isis_circuit_down()`,` circuit->u.bc.run_dr_elect[x] `will not be cleared because `circuit->u.bc.is_dr[x]` is always 0. Consequently, when switching to a P2P link, `isis_circuit_circ_type_set()` does not reset the link in a non-C_STATE_UP state, leading to subsequent accesses of `circuit->u.p2p.t_send_p2p_hello` resulting in a non-empty yet invalid address. I believe that in `isis_circuit_down()`, the LAN link should unconditionally clear `circuit->u.bc.run_dr_elect[x]`. Signed-off-by: zhou-run <zhou.run@h3c.com>
1006 lines
31 KiB
Python
1006 lines
31 KiB
Python
#!/usr/bin/env python
|
|
# SPDX-License-Identifier: ISC
|
|
|
|
#
|
|
# test_isis_topo1.py
|
|
# Part of NetDEF Topology Tests
|
|
#
|
|
# Copyright (c) 2017 by
|
|
# Network Device Education Foundation, Inc. ("NetDEF")
|
|
#
|
|
|
|
"""
|
|
test_isis_topo1.py: Test ISIS topology.
|
|
"""
|
|
import time
|
|
import datetime
|
|
import functools
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
import pytest
|
|
|
|
CWD = os.path.dirname(os.path.realpath(__file__))
|
|
sys.path.append(os.path.join(CWD, "../"))
|
|
|
|
# pylint: disable=C0413
|
|
from lib import topotest
|
|
from lib.common_config import (
|
|
retry,
|
|
stop_router,
|
|
start_router,
|
|
)
|
|
from lib.topogen import Topogen, TopoRouter, get_topogen
|
|
from lib.topolog import logger
|
|
|
|
|
|
pytestmark = [pytest.mark.isisd]
|
|
|
|
VERTEX_TYPE_LIST = [
|
|
"pseudo_IS",
|
|
"pseudo_TE-IS",
|
|
"IS",
|
|
"TE-IS",
|
|
"ES",
|
|
"IP internal",
|
|
"IP external",
|
|
"IP TE",
|
|
"IP6 internal",
|
|
"IP6 external",
|
|
"UNKNOWN",
|
|
]
|
|
|
|
|
|
def build_topo(tgen):
|
|
"Build function"
|
|
|
|
# Add ISIS routers:
|
|
# r1 r2
|
|
# | sw1 | sw2
|
|
# r3 r4
|
|
# | |
|
|
# sw3 sw4
|
|
# \ /
|
|
# r5
|
|
for routern in range(1, 6):
|
|
tgen.add_router("r{}".format(routern))
|
|
|
|
# r1 <- sw1 -> r3
|
|
sw = tgen.add_switch("sw1")
|
|
sw.add_link(tgen.gears["r1"])
|
|
sw.add_link(tgen.gears["r3"])
|
|
|
|
# r2 <- sw2 -> r4
|
|
sw = tgen.add_switch("sw2")
|
|
sw.add_link(tgen.gears["r2"])
|
|
sw.add_link(tgen.gears["r4"])
|
|
|
|
# r3 <- sw3 -> r5
|
|
sw = tgen.add_switch("sw3")
|
|
sw.add_link(tgen.gears["r3"])
|
|
sw.add_link(tgen.gears["r5"])
|
|
|
|
# r4 <- sw4 -> r5
|
|
sw = tgen.add_switch("sw4")
|
|
sw.add_link(tgen.gears["r4"])
|
|
sw.add_link(tgen.gears["r5"])
|
|
|
|
|
|
def setup_module(mod):
|
|
"Sets up the pytest environment"
|
|
tgen = Topogen(build_topo, mod.__name__)
|
|
tgen.start_topology()
|
|
|
|
# For all registered routers, load the zebra configuration file
|
|
for rname, router in tgen.routers().items():
|
|
router.load_config(
|
|
TopoRouter.RD_ZEBRA, os.path.join(CWD, "{}/zebra.conf".format(rname))
|
|
)
|
|
router.load_config(
|
|
TopoRouter.RD_ISIS, os.path.join(CWD, "{}/isisd.conf".format(rname))
|
|
)
|
|
|
|
# After loading the configurations, this function loads configured daemons.
|
|
tgen.start_router()
|
|
|
|
|
|
def teardown_module():
|
|
"Teardown the pytest environment"
|
|
tgen = get_topogen()
|
|
|
|
# This function tears down the whole topology.
|
|
tgen.stop_topology()
|
|
|
|
|
|
def test_isis_convergence():
|
|
"Wait for the protocol to converge before starting to test"
|
|
tgen = get_topogen()
|
|
# Don't run this test if we have any failure.
|
|
if tgen.routers_have_failure():
|
|
pytest.skip(tgen.errors)
|
|
|
|
logger.info("waiting for ISIS protocol to converge")
|
|
for rname, router in tgen.routers().items():
|
|
filename = "{0}/{1}/{1}_topology.json".format(CWD, rname)
|
|
expected = json.loads(open(filename).read())
|
|
|
|
def compare_isis_topology(router, expected):
|
|
"Helper function to test ISIS topology convergence."
|
|
actual = json.loads(router.vtysh_cmd("show isis topology json"))
|
|
return topotest.json_cmp(actual, expected)
|
|
|
|
test_func = functools.partial(compare_isis_topology, router, expected)
|
|
(result, diff) = topotest.run_and_expect(test_func, None, wait=0.5, count=120)
|
|
assert result, "ISIS did not converge on {}:\n{}".format(rname, diff)
|
|
|
|
|
|
def test_isis_route_installation():
|
|
"Check whether all expected routes are present"
|
|
tgen = get_topogen()
|
|
# Don't run this test if we have any failure.
|
|
if tgen.routers_have_failure():
|
|
pytest.skip(tgen.errors)
|
|
|
|
logger.info("Checking routers for installed ISIS routes")
|
|
|
|
# Check for routes in 'show ip route json'
|
|
for rname, router in tgen.routers().items():
|
|
filename = "{0}/{1}/{1}_route.json".format(CWD, rname)
|
|
expected = json.loads(open(filename, "r").read())
|
|
|
|
def compare_isis_installed_routes(router, expected):
|
|
"Helper function to test ISIS routes installed in rib."
|
|
actual = router.vtysh_cmd("show ip route json", isjson=True)
|
|
return topotest.json_cmp(actual, expected)
|
|
|
|
test_func = functools.partial(compare_isis_installed_routes, router, expected)
|
|
(result, _) = topotest.run_and_expect(test_func, None, wait=1, count=10)
|
|
assertmsg = "Router '{}' routes mismatch".format(rname)
|
|
assert result, assertmsg
|
|
|
|
|
|
def test_isis_linux_route_installation():
|
|
"Check whether all expected routes are present and installed in the OS"
|
|
tgen = get_topogen()
|
|
# Don't run this test if we have any failure.
|
|
if tgen.routers_have_failure():
|
|
pytest.skip(tgen.errors)
|
|
|
|
logger.info("Checking routers for installed ISIS routes in OS")
|
|
|
|
# Check for routes in `ip route`
|
|
for rname, router in tgen.routers().items():
|
|
filename = "{0}/{1}/{1}_route_linux.json".format(CWD, rname)
|
|
expected = json.loads(open(filename, "r").read())
|
|
actual = topotest.ip4_route(router)
|
|
assertmsg = "Router '{}' OS routes mismatch".format(rname)
|
|
assert topotest.json_cmp(actual, expected) is None, assertmsg
|
|
|
|
|
|
def test_isis_route6_installation():
|
|
"Check whether all expected routes are present"
|
|
tgen = get_topogen()
|
|
# Don't run this test if we have any failure.
|
|
if tgen.routers_have_failure():
|
|
pytest.skip(tgen.errors)
|
|
|
|
logger.info("Checking routers for installed ISIS IPv6 routes")
|
|
|
|
# Check for routes in 'show ip route json'
|
|
for rname, router in tgen.routers().items():
|
|
filename = "{0}/{1}/{1}_route6.json".format(CWD, rname)
|
|
expected = json.loads(open(filename, "r").read())
|
|
|
|
def compare_isis_v6_installed_routes(router, expected):
|
|
"Helper function to test ISIS v6 routes installed in rib."
|
|
actual = router.vtysh_cmd("show ipv6 route json", isjson=True)
|
|
return topotest.json_cmp(actual, expected)
|
|
|
|
test_func = functools.partial(
|
|
compare_isis_v6_installed_routes, router, expected
|
|
)
|
|
(result, _) = topotest.run_and_expect(test_func, None, wait=1, count=10)
|
|
assertmsg = "Router '{}' routes mismatch".format(rname)
|
|
assert result, assertmsg
|
|
|
|
|
|
def test_isis_linux_route6_installation():
|
|
"Check whether all expected routes are present and installed in the OS"
|
|
tgen = get_topogen()
|
|
# Don't run this test if we have any failure.
|
|
if tgen.routers_have_failure():
|
|
pytest.skip(tgen.errors)
|
|
|
|
logger.info("Checking routers for installed ISIS IPv6 routes in OS")
|
|
|
|
# Check for routes in `ip route`
|
|
for rname, router in tgen.routers().items():
|
|
filename = "{0}/{1}/{1}_route6_linux.json".format(CWD, rname)
|
|
expected = json.loads(open(filename, "r").read())
|
|
actual = topotest.ip6_route(router)
|
|
assertmsg = "Router '{}' OS routes mismatch".format(rname)
|
|
assert topotest.json_cmp(actual, expected) is None, assertmsg
|
|
|
|
|
|
def test_isis_summary_json():
|
|
"Check json struct in show isis summary json"
|
|
|
|
tgen = get_topogen()
|
|
# Don't run this test if we have any failure.
|
|
if tgen.routers_have_failure():
|
|
pytest.skip(tgen.errors)
|
|
|
|
logger.info("Checking 'show isis summary json'")
|
|
for rname, _ in tgen.routers().items():
|
|
logger.info("Checking router %s", rname)
|
|
json_output = tgen.gears[rname].vtysh_cmd("show isis summary json", isjson=True)
|
|
assertmsg = "Test isis summary json failed in '{}' data '{}'".format(
|
|
rname, json_output
|
|
)
|
|
assert json_output["vrf"] == "default", assertmsg
|
|
assert json_output["areas"][0]["area"] == "1", assertmsg
|
|
assert json_output["areas"][0]["levels"][0]["id"] != "3", assertmsg
|
|
|
|
|
|
def test_isis_interface_json():
|
|
"Check json struct in show isis interface json"
|
|
|
|
tgen = get_topogen()
|
|
# Don't run this test if we have any failure.
|
|
if tgen.routers_have_failure():
|
|
pytest.skip(tgen.errors)
|
|
|
|
logger.info("Checking 'show isis interface json'")
|
|
for rname, _ in tgen.routers().items():
|
|
logger.info("Checking router %s", rname)
|
|
json_output = tgen.gears[rname].vtysh_cmd(
|
|
"show isis interface json", isjson=True
|
|
)
|
|
assertmsg = "Test isis interface json failed in '{}' data '{}'".format(
|
|
rname, json_output
|
|
)
|
|
assert (
|
|
json_output["areas"][0]["circuits"][0]["interface"]["name"]
|
|
== rname + "-eth0"
|
|
), assertmsg
|
|
|
|
for rname, router in tgen.routers().items():
|
|
logger.info("Checking router %s", rname)
|
|
json_output = tgen.gears[rname].vtysh_cmd(
|
|
"show isis interface detail json", isjson=True
|
|
)
|
|
assertmsg = "Test isis interface json failed in '{}' data '{}'".format(
|
|
rname, json_output
|
|
)
|
|
assert (
|
|
json_output["areas"][0]["circuits"][0]["interface"]["name"]
|
|
== rname + "-eth0"
|
|
), assertmsg
|
|
|
|
|
|
def test_isis_neighbor_json():
|
|
"Check json struct in show isis neighbor json"
|
|
|
|
tgen = get_topogen()
|
|
# Don't run this test if we have any failure.
|
|
if tgen.routers_have_failure():
|
|
pytest.skip(tgen.errors)
|
|
|
|
# tgen.mininet_cli()
|
|
logger.info("Checking 'show isis neighbor json'")
|
|
for rname, _ in tgen.routers().items():
|
|
logger.info("Checking router %s", rname)
|
|
json_output = tgen.gears[rname].vtysh_cmd(
|
|
"show isis neighbor json", isjson=True
|
|
)
|
|
assertmsg = "Test isis neighbor json failed in '{}' data '{}'".format(
|
|
rname, json_output
|
|
)
|
|
assert (
|
|
json_output["areas"][0]["circuits"][0]["interface"] == rname + "-eth0"
|
|
), assertmsg
|
|
|
|
for rname, router in tgen.routers().items():
|
|
logger.info("Checking router %s", rname)
|
|
json_output = tgen.gears[rname].vtysh_cmd(
|
|
"show isis neighbor detail json", isjson=True
|
|
)
|
|
assertmsg = "Test isis neighbor json failed in '{}' data '{}'".format(
|
|
rname, json_output
|
|
)
|
|
assert (
|
|
json_output["areas"][0]["circuits"][0]["interface"]["name"]
|
|
== rname + "-eth0"
|
|
), assertmsg
|
|
|
|
|
|
def test_isis_neighbor_state():
|
|
"Check that the neighbor states remain normal when the ISIS type is switched."
|
|
|
|
tgen = get_topogen()
|
|
# Don't run this test if we have any failure.
|
|
if tgen.routers_have_failure():
|
|
pytest.skip(tgen.errors)
|
|
|
|
logger.info("Checking 'show isis neighbor state on a p2p link'")
|
|
|
|
# Establish a P2P link
|
|
# When the IS-IS type of r3 is set to level-1-2 and the IS-IS type of r5 is set to level-1,
|
|
# it is expected that all neighbors exist and are in the Up state
|
|
r3 = tgen.gears["r3"]
|
|
r3.vtysh_cmd(
|
|
"""
|
|
configure
|
|
router isis 1
|
|
no redistribute ipv4 connected level-1
|
|
no redistribute ipv4 connected level-2
|
|
no redistribute ipv6 connected level-1
|
|
no redistribute ipv6 connected level-2
|
|
interface r3-eth1
|
|
no isis circuit-type
|
|
isis network point-to-point
|
|
end
|
|
"""
|
|
)
|
|
r5 = tgen.gears["r5"]
|
|
r5.vtysh_cmd(
|
|
"""
|
|
configure
|
|
router isis 1
|
|
no redistribute ipv4 connected level-1
|
|
no redistribute ipv6 connected level-1
|
|
no redistribute ipv4 table 20 level-1
|
|
interface r5-eth0
|
|
no isis circuit-type
|
|
isis network point-to-point
|
|
end
|
|
"""
|
|
)
|
|
result = _check_isis_neighbor_json("r3", "r5", True, "Up")
|
|
assert result is True, result
|
|
result = _check_isis_neighbor_json("r5", "r3", True, "Up")
|
|
assert result is True, result
|
|
|
|
# Remove the configuration that affects the switch of IS-IS type.
|
|
# Configure the IS-IS type of r3 to transition from level-1-2 to level-2-only,
|
|
# while maintaining the IS-IS type of r5 as level-1.
|
|
# In this scenario,
|
|
# the expectation is that some neighbors do not exist or are in the Initializing state
|
|
r3.vtysh_cmd(
|
|
"""
|
|
configure
|
|
router isis 1
|
|
is-type level-2-only
|
|
end
|
|
"""
|
|
)
|
|
result = _check_isis_neighbor_json("r3", "r5", False, "Initializing")
|
|
assert result is True, result
|
|
result = _check_isis_neighbor_json("r5", "r3", False, "Initializing")
|
|
assert result is True, result
|
|
|
|
# Restore to initial configuration
|
|
logger.info("Checking 'restore to initial configuration'")
|
|
r3.vtysh_cmd(
|
|
"""
|
|
configure
|
|
interface r3-eth1
|
|
isis circuit-type level-1
|
|
no isis network point-to-point
|
|
router isis 1
|
|
no is-type
|
|
redistribute ipv4 connected level-1
|
|
redistribute ipv4 connected level-2
|
|
redistribute ipv6 connected level-1
|
|
redistribute ipv6 connected level-2
|
|
end
|
|
"""
|
|
)
|
|
r5.vtysh_cmd(
|
|
"""
|
|
configure
|
|
interface r5-eth0
|
|
isis circuit-type level-1
|
|
no isis network point-to-point
|
|
router isis 1
|
|
redistribute ipv4 connected level-1
|
|
redistribute ipv6 connected level-1
|
|
redistribute ipv4 table 20 level-1
|
|
end
|
|
"""
|
|
)
|
|
result = _check_isis_neighbor_json("r3", "r5", True, "Up")
|
|
assert result is True, result
|
|
result = _check_isis_neighbor_json("r5", "r3", True, "Up")
|
|
assert result is True, result
|
|
|
|
|
|
def test_isis_database_json():
|
|
"Check json struct in show isis database json"
|
|
|
|
tgen = get_topogen()
|
|
# Don't run this test if we have any failure.
|
|
if tgen.routers_have_failure():
|
|
pytest.skip(tgen.errors)
|
|
|
|
# tgen.mininet_cli()
|
|
logger.info("Checking 'show isis database json'")
|
|
for rname, _ in tgen.routers().items():
|
|
logger.info("Checking router %s", rname)
|
|
json_output = tgen.gears[rname].vtysh_cmd(
|
|
"show isis database json", isjson=True
|
|
)
|
|
assertmsg = "Test isis database json failed in '{}' data '{}'".format(
|
|
rname, json_output
|
|
)
|
|
assert json_output["areas"][0]["area"]["name"] == "1", assertmsg
|
|
assert json_output["areas"][0]["levels"][0]["id"] != "3", assertmsg
|
|
|
|
for rname, router in tgen.routers().items():
|
|
logger.info("Checking router %s", rname)
|
|
json_output = tgen.gears[rname].vtysh_cmd(
|
|
"show isis database detail json", isjson=True
|
|
)
|
|
assertmsg = "Test isis database json failed in '{}' data '{}'".format(
|
|
rname, json_output
|
|
)
|
|
assert json_output["areas"][0]["area"]["name"] == "1", assertmsg
|
|
assert json_output["areas"][0]["levels"][0]["id"] != "3", assertmsg
|
|
|
|
|
|
def test_isis_overload_on_startup():
|
|
"Check that overload on startup behaves as expected"
|
|
|
|
tgen = get_topogen()
|
|
net = get_topogen().net
|
|
overload_time = 120
|
|
|
|
# Don't run this test if we have any failure.
|
|
if tgen.routers_have_failure():
|
|
pytest.skip(tgen.errors)
|
|
|
|
logger.info("Testing overload on startup behavior")
|
|
|
|
# Configure set-overload-bit on-startup on r3
|
|
r3 = tgen.gears["r3"]
|
|
r3.vtysh_cmd(
|
|
f"""
|
|
configure
|
|
router isis 1
|
|
set-overload-bit on-startup {overload_time}
|
|
"""
|
|
)
|
|
# Restart r3
|
|
logger.info("Stop router")
|
|
stop_router(tgen, "r3")
|
|
logger.info("Start router")
|
|
|
|
tstamp_before_start_router = datetime.datetime.now()
|
|
start_router(tgen, "r3")
|
|
tstamp_after_start_router = datetime.datetime.now()
|
|
startup_router_time = (
|
|
tstamp_after_start_router - tstamp_before_start_router
|
|
).total_seconds()
|
|
|
|
# Check that the overload bit is set in r3's LSP
|
|
check_lsp_overload_bit("r3", "r3.00-00", "0/0/1")
|
|
check_lsp_overload_bit("r1", "r3.00-00", "0/0/1")
|
|
|
|
# Attempt to unset overload bit while timer is still running
|
|
r3.vtysh_cmd(
|
|
"""
|
|
configure
|
|
router isis 1
|
|
no set-overload-bit on-startup
|
|
no set-overload-bit
|
|
"""
|
|
)
|
|
|
|
# Check overload bit is still set
|
|
check_lsp_overload_bit("r1", "r3.00-00", "0/0/1")
|
|
|
|
# Check that overload bit is unset after timer completes
|
|
check_lsp_overload_bit("r3", "r3.00-00", "0/0/0")
|
|
tstamp_after_bit_unset = datetime.datetime.now()
|
|
check_lsp_overload_bit("r1", "r3.00-00", "0/0/0")
|
|
|
|
# Collect time overloaded
|
|
time_overloaded = (
|
|
tstamp_after_bit_unset - tstamp_after_start_router
|
|
).total_seconds()
|
|
logger.info(f"Time Overloaded: {time_overloaded}")
|
|
|
|
# Use time it took to startup router as lower bound
|
|
logger.info(
|
|
f"Assert that overload time falls in range: {overload_time - startup_router_time} < {time_overloaded} <= {overload_time}"
|
|
)
|
|
result = overload_time - startup_router_time < time_overloaded <= overload_time
|
|
assert result
|
|
|
|
|
|
def test_isis_overload_on_startup_cancel_timer():
|
|
"Check that overload on startup timer is cancelled when overload bit is set/unset"
|
|
|
|
tgen = get_topogen()
|
|
net = get_topogen().net
|
|
overload_time = 90
|
|
|
|
# Don't run this test if we have any failure.
|
|
if tgen.routers_have_failure():
|
|
pytest.skip(tgen.errors)
|
|
|
|
logger.info(
|
|
"Testing overload on startup behavior with set overload bit: cancel timer"
|
|
)
|
|
|
|
# Configure set-overload-bit on-startup on r3
|
|
r3 = tgen.gears["r3"]
|
|
r3.vtysh_cmd(
|
|
f"""
|
|
configure
|
|
router isis 1
|
|
set-overload-bit on-startup {overload_time}
|
|
set-overload-bit
|
|
"""
|
|
)
|
|
# Restart r3
|
|
logger.info("Stop router")
|
|
stop_router(tgen, "r3")
|
|
logger.info("Start router")
|
|
start_router(tgen, "r3")
|
|
|
|
# Check that the overload bit is set in r3's LSP
|
|
check_lsp_overload_bit("r3", "r3.00-00", "0/0/1")
|
|
|
|
# Check that overload timer is running
|
|
check_overload_timer("r3", True)
|
|
|
|
# Unset overload bit while timer is running
|
|
r3.vtysh_cmd(
|
|
"""
|
|
configure
|
|
router isis 1
|
|
no set-overload-bit
|
|
"""
|
|
)
|
|
|
|
# Check that overload timer is cancelled
|
|
check_overload_timer("r3", False)
|
|
|
|
# Check overload bit is unset
|
|
check_lsp_overload_bit("r3", "r3.00-00", "0/0/0")
|
|
|
|
|
|
def test_isis_overload_on_startup_override_timer():
|
|
"Check that overload bit remains set after overload timer expires if overload bit is configured"
|
|
|
|
tgen = get_topogen()
|
|
net = get_topogen().net
|
|
overload_time = 60
|
|
|
|
# Don't run this test if we have any failure.
|
|
if tgen.routers_have_failure():
|
|
pytest.skip(tgen.errors)
|
|
|
|
logger.info(
|
|
"Testing overload on startup behavior with set overload bit: override timer"
|
|
)
|
|
|
|
# Configure set-overload-bit on-startup on r3
|
|
r3 = tgen.gears["r3"]
|
|
r3.vtysh_cmd(
|
|
f"""
|
|
configure
|
|
router isis 1
|
|
set-overload-bit on-startup {overload_time}
|
|
set-overload-bit
|
|
"""
|
|
)
|
|
# Restart r3
|
|
logger.info("Stop router")
|
|
stop_router(tgen, "r3")
|
|
logger.info("Start router")
|
|
start_router(tgen, "r3")
|
|
|
|
# Check that the overload bit is set in r3's LSP
|
|
check_lsp_overload_bit("r3", "r3.00-00", "0/0/1")
|
|
|
|
# Check that overload timer is running
|
|
check_overload_timer("r3", True)
|
|
|
|
# Check that overload timer expired
|
|
check_overload_timer("r3", False)
|
|
|
|
# Check overload bit is still set
|
|
check_lsp_overload_bit("r3", "r3.00-00", "0/0/1")
|
|
|
|
|
|
def test_isis_advertise_passive_only():
|
|
"""Check that we only advertise prefixes of passive interfaces when advertise-passive-only is enabled."""
|
|
tgen = get_topogen()
|
|
net = get_topogen().net
|
|
# Don't run this test if we have any failure.
|
|
if tgen.routers_have_failure():
|
|
pytest.skip(tgen.errors)
|
|
|
|
logger.info("Testing isis advertise-passive-only behavior")
|
|
expected_prefixes_no_advertise_passive_only = set(
|
|
["10.0.20.0/24", "10.254.0.1/32", "2001:db8:f::1/128", "2001:db8:1:1::/64"]
|
|
)
|
|
expected_prefixes_advertise_passive_only = set(
|
|
["10.254.0.1/32", "2001:db8:f::1/128"]
|
|
)
|
|
lsp_id = "r1.00-00"
|
|
|
|
r1 = tgen.gears["r1"]
|
|
r1.vtysh_cmd(
|
|
"""
|
|
configure
|
|
router isis 1
|
|
no redistribute ipv4 connected level-2
|
|
no redistribute ipv6 connected level-2
|
|
interface lo
|
|
ip router isis 1
|
|
ipv6 router isis 1
|
|
isis passive
|
|
end
|
|
"""
|
|
)
|
|
|
|
result = check_advertised_prefixes(
|
|
r1, lsp_id, expected_prefixes_no_advertise_passive_only
|
|
)
|
|
assert result is True, result
|
|
|
|
r1.vtysh_cmd(
|
|
"""
|
|
configure
|
|
router isis 1
|
|
advertise-passive-only
|
|
end
|
|
"""
|
|
)
|
|
|
|
result = check_advertised_prefixes(
|
|
r1, lsp_id, expected_prefixes_advertise_passive_only
|
|
)
|
|
assert result is True, result
|
|
|
|
|
|
def test_isis_hello_padding_during_adjacency_formation():
|
|
"""Check that IIH packets is only padded when adjacency is still being formed
|
|
when isis hello padding during-adjacency-formation is configured
|
|
"""
|
|
tgen = get_topogen()
|
|
net = get_topogen().net
|
|
# Don't run this test if we have any failure.
|
|
if tgen.routers_have_failure():
|
|
pytest.skip(tgen.errors)
|
|
|
|
logger.info("Testing isis hello padding during-adjacency-formation behavior")
|
|
r3 = tgen.gears["r3"]
|
|
|
|
# Reduce hello-multiplier to make the adjacency go down faster.
|
|
r3.vtysh_cmd(
|
|
"""
|
|
configure
|
|
interface r3-eth0
|
|
isis hello-multiplier 2
|
|
"""
|
|
)
|
|
|
|
r1 = tgen.gears["r1"]
|
|
cmd_output = r1.vtysh_cmd(
|
|
"""
|
|
configure
|
|
interface r1-eth0
|
|
isis hello padding during-adjacency-formation
|
|
end
|
|
debug isis adj-packets
|
|
"""
|
|
)
|
|
result = check_last_iih_packet_for_padding(r1, expect_padding=False)
|
|
assert result is True, result
|
|
|
|
r3.vtysh_cmd(
|
|
"""
|
|
configure
|
|
interface r3-eth0
|
|
shutdown
|
|
"""
|
|
)
|
|
result = check_last_iih_packet_for_padding(r1, expect_padding=True)
|
|
assert result is True, result
|
|
|
|
r3 = tgen.gears["r3"]
|
|
r3.vtysh_cmd(
|
|
"""
|
|
configure
|
|
interface r3-eth0
|
|
no shutdown
|
|
"""
|
|
)
|
|
result = check_last_iih_packet_for_padding(r1, expect_padding=False)
|
|
assert result is True, result
|
|
|
|
|
|
def _check_isis_neighbor_json(
|
|
self, neighbor, neighbor_expected, neighbor_state_expected
|
|
):
|
|
tgen = get_topogen()
|
|
router = tgen.gears[self]
|
|
logger.info(
|
|
f"check_isis_neighbor_json {router} {neighbor} {neighbor_expected} {neighbor_state_expected}"
|
|
)
|
|
|
|
result = _check_isis_neighbor_exist(self, neighbor)
|
|
if result == True:
|
|
return _check_isis_neighbor_state(self, neighbor, neighbor_state_expected)
|
|
elif neighbor_expected == True:
|
|
return "{} with expected neighbor {} got none ".format(router.name, neighbor)
|
|
else:
|
|
return True
|
|
|
|
|
|
@retry(retry_timeout=60)
|
|
def _check_isis_neighbor_exist(self, neighbor):
|
|
tgen = get_topogen()
|
|
router = tgen.gears[self]
|
|
logger.info(f"check_isis_neighbor_exist {router} {neighbor}")
|
|
neighbor_json = router.vtysh_cmd("show isis neighbor json", isjson=True)
|
|
|
|
circuits = neighbor_json.get("areas", [])[0].get("circuits", [])
|
|
for circuit in circuits:
|
|
if "adj" in circuit and circuit["adj"] == neighbor:
|
|
return True
|
|
|
|
return "The neighbor {} of router {} has not been learned yet ".format(
|
|
neighbor, router.name
|
|
)
|
|
|
|
|
|
@retry(retry_timeout=5)
|
|
def _check_isis_neighbor_state(self, neighbor, neighbor_state_expected):
|
|
tgen = get_topogen()
|
|
router = tgen.gears[self]
|
|
logger.info(
|
|
f"check_isis_neighbor_state {router} {neighbor} {neighbor_state_expected}"
|
|
)
|
|
neighbor_json = router.vtysh_cmd(
|
|
"show isis neighbor {} json".format(neighbor), isjson=True
|
|
)
|
|
|
|
circuits = neighbor_json.get("areas", [])[0].get("circuits", [])
|
|
for circuit in circuits:
|
|
interface = circuit.get("interface", {})
|
|
if "state" in interface:
|
|
neighbor_state = interface["state"]
|
|
if neighbor_state == neighbor_state_expected:
|
|
return True
|
|
|
|
return "{} peer with expected neighbor_state {} got {} ".format(
|
|
router.name, neighbor_state_expected, neighbor_state
|
|
)
|
|
|
|
|
|
@retry(retry_timeout=10)
|
|
def check_last_iih_packet_for_padding(router, expect_padding):
|
|
logfilename = "{}/{}".format(router.gearlogdir, "isisd.log")
|
|
last_hello_packet_line = None
|
|
with open(logfilename, "r") as f:
|
|
lines = f.readlines()
|
|
for line in lines:
|
|
if re.search("Sending .+? IIH", line):
|
|
last_hello_packet_line = line
|
|
|
|
if last_hello_packet_line is None:
|
|
return "Expected IIH packet in {}, but no packet found".format(logfilename)
|
|
|
|
interface_name, packet_length = re.search(
|
|
r"Sending .+ IIH on (.+), length (\d+)", last_hello_packet_line
|
|
).group(1, 2)
|
|
packet_length = int(packet_length)
|
|
interface_output = router.vtysh_cmd("show interface {} json".format(interface_name))
|
|
interface_json = json.loads(interface_output)
|
|
padded_packet_length = interface_json[interface_name]["mtu"] - 3
|
|
if expect_padding:
|
|
if packet_length == padded_packet_length:
|
|
return True
|
|
return (
|
|
"Expected padded packet with length {}, got packet with length {}".format(
|
|
padded_packet_length, packet_length
|
|
)
|
|
)
|
|
if packet_length < padded_packet_length:
|
|
return True
|
|
return "Expected unpadded packet with length less than {}, got packet with length {}".format(
|
|
padded_packet_length, packet_length
|
|
)
|
|
|
|
|
|
@retry(retry_timeout=5)
|
|
def check_advertised_prefixes(router, lsp_id, expected_prefixes):
|
|
output = router.vtysh_cmd("show isis database detail {}".format(lsp_id))
|
|
prefixes = set(re.findall(r"IP(?:v6)? Reachability: (.*) \(Metric: 10\)", output))
|
|
if prefixes == expected_prefixes:
|
|
return True
|
|
return str({"expected_prefixes:": expected_prefixes, "prefixes": prefixes})
|
|
|
|
|
|
@retry(retry_timeout=200)
|
|
def _check_lsp_overload_bit(router, overloaded_router_lsp, att_p_ol_expected):
|
|
"Verfiy overload bit in router's LSP"
|
|
|
|
tgen = get_topogen()
|
|
router = tgen.gears[router]
|
|
logger.info(f"check_overload_bit {router}")
|
|
isis_database_output = router.vtysh_cmd(
|
|
"show isis database {} json".format(overloaded_router_lsp)
|
|
)
|
|
|
|
database_json = json.loads(isis_database_output)
|
|
if "lsps" not in database_json["areas"][0]["levels"][1]:
|
|
return "The LSP of {} has not been synchronized yet ".format(router.name)
|
|
|
|
att_p_ol = database_json["areas"][0]["levels"][1]["lsps"][0]["attPOl"]
|
|
if att_p_ol == att_p_ol_expected:
|
|
return True
|
|
return "{} peer with expected att_p_ol {} got {} ".format(
|
|
router.name, att_p_ol_expected, att_p_ol
|
|
)
|
|
|
|
|
|
def check_lsp_overload_bit(router, overloaded_router_lsp, att_p_ol_expected):
|
|
"Verfiy overload bit in router's LSP"
|
|
|
|
assertmsg = _check_lsp_overload_bit(
|
|
router, overloaded_router_lsp, att_p_ol_expected
|
|
)
|
|
assert assertmsg is True, assertmsg
|
|
|
|
|
|
@retry(retry_timeout=200)
|
|
def _check_overload_timer(router, timer_expected):
|
|
"Verfiy overload bit in router's LSP"
|
|
|
|
tgen = get_topogen()
|
|
router = tgen.gears[router]
|
|
output = router.vtysh_cmd("show event timers")
|
|
|
|
timer_running = "set_overload_on_start_timer" in output
|
|
if timer_running == timer_expected:
|
|
return True
|
|
return "Expected timer running status: {}".format(timer_expected)
|
|
|
|
|
|
def check_overload_timer(router, timer_expected):
|
|
"Verfiy overload bit in router's LSP"
|
|
|
|
assertmsg = _check_overload_timer(router, timer_expected)
|
|
assert assertmsg is True, assertmsg
|
|
|
|
|
|
def test_memory_leak():
|
|
"Run the memory leak test and report results."
|
|
tgen = get_topogen()
|
|
if not tgen.is_memleak_enabled():
|
|
pytest.skip("Memory leak test/report is disabled")
|
|
|
|
tgen.report_memory_leaks()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
args = ["-s"] + sys.argv[1:]
|
|
sys.exit(pytest.main(args))
|
|
|
|
|
|
#
|
|
# Auxiliary functions
|
|
#
|
|
|
|
|
|
def dict_merge(dct, merge_dct):
|
|
"""
|
|
Recursive dict merge. Inspired by :meth:``dict.update()``, instead of
|
|
updating only top-level keys, dict_merge recurses down into dicts nested
|
|
to an arbitrary depth, updating keys. The ``merge_dct`` is merged into
|
|
``dct``.
|
|
:param dct: dict onto which the merge is executed
|
|
:param merge_dct: dct merged into dct
|
|
:return: None
|
|
|
|
Source:
|
|
https://gist.github.com/angstwad/bf22d1822c38a92ec0a9
|
|
"""
|
|
for k, _ in merge_dct.items():
|
|
if k in dct and isinstance(dct[k], dict) and topotest.is_mapping(merge_dct[k]):
|
|
dict_merge(dct[k], merge_dct[k])
|
|
else:
|
|
dct[k] = merge_dct[k]
|
|
|
|
|
|
def parse_topology(lines, level):
|
|
"""
|
|
Parse the output of 'show isis topology level-X' into a Python dict.
|
|
"""
|
|
areas = {}
|
|
area = None
|
|
ipv = None
|
|
vertex_type_regex = "|".join(VERTEX_TYPE_LIST)
|
|
|
|
for line in lines:
|
|
area_match = re.match(r"Area (.+):", line)
|
|
if area_match:
|
|
area = area_match.group(1)
|
|
if area not in areas:
|
|
areas[area] = {level: {"ipv4": [], "ipv6": []}}
|
|
ipv = None
|
|
continue
|
|
elif area is None:
|
|
continue
|
|
|
|
if re.match(r"IS\-IS paths to level-. routers that speak IPv6", line):
|
|
ipv = "ipv6"
|
|
continue
|
|
if re.match(r"IS\-IS paths to level-. routers that speak IP", line):
|
|
ipv = "ipv4"
|
|
continue
|
|
|
|
item_match = re.match(
|
|
r"([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+)", line
|
|
)
|
|
if (
|
|
item_match is not None
|
|
and item_match.group(1) == "Vertex"
|
|
and item_match.group(2) == "Type"
|
|
and item_match.group(3) == "Metric"
|
|
and item_match.group(4) == "Next-Hop"
|
|
and item_match.group(5) == "Interface"
|
|
and item_match.group(6) == "Parent"
|
|
):
|
|
# Skip header
|
|
continue
|
|
|
|
item_match = re.match(
|
|
r"([^\s]+) ({}) ([0]|([1-9][0-9]*)) ([^\s]+) ([^\s]+) ([^\s]+)".format(
|
|
vertex_type_regex
|
|
),
|
|
line,
|
|
)
|
|
if item_match is not None:
|
|
areas[area][level][ipv].append(
|
|
{
|
|
"vertex": item_match.group(1),
|
|
"type": item_match.group(2),
|
|
"metric": item_match.group(3),
|
|
"next-hop": item_match.group(5),
|
|
"interface": item_match.group(6),
|
|
"parent": item_match.group(7),
|
|
}
|
|
)
|
|
continue
|
|
|
|
item_match = re.match(
|
|
r"([^\s]+) ({}) ([0]|([1-9][0-9]*)) ([^\s]+)".format(vertex_type_regex),
|
|
line,
|
|
)
|
|
|
|
if item_match is not None:
|
|
areas[area][level][ipv].append(
|
|
{
|
|
"vertex": item_match.group(1),
|
|
"type": item_match.group(2),
|
|
"metric": item_match.group(3),
|
|
"parent": item_match.group(5),
|
|
}
|
|
)
|
|
continue
|
|
|
|
item_match = re.match(r"([^\s]+)", line)
|
|
if item_match is not None:
|
|
areas[area][level][ipv].append({"vertex": item_match.group(1)})
|
|
continue
|
|
|
|
return areas
|