#!/usr/bin/env python # SPDX-License-Identifier: ISC # # test_pim_boundary_acl.py # # Copyright (c) 2024 Architecture Technology Corporation # Corey Siltala # """ test_pim_boundary_acl.py: Test multicast boundary commands (access-lists and prefix-lists) """ import os import sys import pytest import json from functools import partial pytestmark = [pytest.mark.pimd] CWD = os.path.dirname(os.path.realpath(__file__)) sys.path.append(os.path.join(CWD, "../")) # pylint: disable=C0413 from lib import topotest from lib.topogen import Topogen, TopoRouter, get_topogen from lib.topolog import logger ASM_GROUP="229.1.1.1" SSM_GROUP="232.1.1.1" def build_topo(tgen): "Build function" for routern in range(1, 4): tgen.add_router("r{}".format(routern)) tgen.add_router("rp") # rp ------ r1 -------- r2 # \ # --------- r3 # r1 -> .1 # r2 -> .2 # rp -> .3 # r3 -> .4 # loopback network is 10.254.0.X/32 # # r1 <- sw1 -> r2 # r1-eth0 <-> r2-eth0 # 10.0.20.0/24 sw = tgen.add_switch("sw1") sw.add_link(tgen.gears["r1"]) sw.add_link(tgen.gears["r2"]) # r1 <- sw2 -> rp # r1-eth1 <-> rp-eth0 # 10.0.30.0/24 sw = tgen.add_switch("sw2") sw.add_link(tgen.gears["r1"]) sw.add_link(tgen.gears["rp"]) # r1 <- sw3 -> r3 # r1-eth2 <-> r3-eth0 # 10.0.40.0/24 sw = tgen.add_switch("sw3") sw.add_link(tgen.gears["r1"]) sw.add_link(tgen.gears["r3"]) def setup_module(mod): "Sets up the pytest environment" tgen = Topogen(build_topo, mod.__name__) tgen.start_topology() # For all registered routers, load the zebra configuration file for rname, router in tgen.routers().items(): logger.info("Loading router %s" % rname) router.load_frr_config(os.path.join(CWD, "{}/frr.conf".format(rname))) # After loading the configurations, this function loads configured daemons. tgen.start_router() # tgen.mininet_cli() def teardown_module(): "Teardown the pytest environment" tgen = get_topogen() # This function tears down the whole topology. tgen.stop_topology() def test_pim_rp_setup(): "Ensure basic routing has come up and the rp has an outgoing interface" # Ensure rp and r1 establish pim neighbor ship and bgp has come up # Finally ensure that the rp has an outgoing interface on r1 tgen = get_topogen() r1 = tgen.gears["r1"] expected = { "10.254.0.3":[ { "outboundInterface":"r1-eth1", "group":"224.0.0.0/4", "source":"Static" } ] } test_func = partial( topotest.router_json_cmp, r1, "show ip pim rp-info json", expected ) _, result = topotest.run_and_expect(test_func, None, count=30, wait=1) assertmsg = '"{}" JSON output mismatches'.format(r1.name) assert result is None, assertmsg # tgen.mininet_cli() def test_pim_asm_igmp_join_acl(): "Test ASM IGMP joins with prefix-list ACLs" logger.info("Send IGMP joins from r2 to r1 with ACL enabled and disabled") tgen = get_topogen() if tgen.routers_have_failure(): pytest.skip(tgen.errors) r2 = tgen.gears["r2"] r1 = tgen.gears["r1"] # No IGMP sources other than from self for AutoRP Discovery group initially expected = { "r1-eth0":{ "name":"r1-eth0", "224.0.1.40":"*", "229.1.1.1":None }, "r1-eth2":{ "name":"r1-eth2", "224.0.1.40":"*", "229.1.1.1":None } } test_func = partial( topotest.router_json_cmp, r1, "show ip igmp sources json", expected ) _, result = topotest.run_and_expect(test_func, None, count=20, wait=1) assert result is None, "Expected no IGMP sources other than for AutoRP Discovery" # Send IGMP join from r2, check if r1 has IGMP source r2.vtysh_cmd(( """ configure terminal interface {} ip igmp join {} """ ).format("r2-eth0", ASM_GROUP)) expected = { "r1-eth0":{ "name":"r1-eth0", "229.1.1.1":{ "group":"229.1.1.1", "sources":[ { "source":"*", "timer":"--:--", "forwarded":False, "uptime":"*" } ] } } } test_func = partial( topotest.router_json_cmp, r1, "show ip igmp sources json", expected ) _, result = topotest.run_and_expect(test_func, None, count=20, wait=1) assert result is None, "Expected IGMP source to be present but is absent" # Test inbound boundary on r1 # Enable multicast boundary on r1, toggle IGMP join on r2 r2.vtysh_cmd(( """ configure terminal interface r2-eth0 no ip igmp join {} """ ).format(ASM_GROUP)) r1.vtysh_cmd( """ configure terminal interface r1-eth0 ip multicast boundary oil pim-oil-plist """ ) r2.vtysh_cmd(( """ configure terminal interface r2-eth0 ip igmp join {} """ ).format(ASM_GROUP)) expected = { "r1-eth0":{ "name":"r1-eth0", "229.1.1.1":None } } test_func = partial( topotest.router_json_cmp, r1, "show ip igmp sources json", expected ) _, result = topotest.run_and_expect(test_func, None, count=20, wait=1) assert result is None, "Expected IGMP source to be absent but is present" # Test outbound boundary on r2 # Enable multicast boundary on r2, toggle IGMP join (test outbound) # Note: json_cmp treats "*" as wildcard but in this case that's actually what the source is expected = { "vrf":"default", "r2-eth0":{ "name":"r2-eth0", "groups":[ { "source":"*", "group":"229.1.1.1", "primaryAddr":"10.0.20.2", "sockFd":"*", "upTime":"*" } ] } } test_func = partial( topotest.router_json_cmp, r2, "show ip igmp join json", expected ) _, result = topotest.run_and_expect(test_func, None, count=20, wait=1) assert result is None, "Expected IGMP join to be present but is absent" r2.vtysh_cmd(( """ configure terminal interface r2-eth0 no ip igmp join {} ip multicast boundary oil pim-oil-plist ip igmp join {} """ ).format(ASM_GROUP, ASM_GROUP)) expected = { "vrf":"default", "r2-eth0":None } test_func = partial( topotest.router_json_cmp, r2, "show ip igmp join json", expected ) _, result = topotest.run_and_expect(test_func, None, count=20, wait=1) assert result is None, "Expected IGMP join to be absent but is present" # Cleanup r2.vtysh_cmd(( """ configure terminal interface r2-eth0 no ip igmp join {} no ip multicast boundary oil pim-oil-plist """ ).format(ASM_GROUP)) def test_pim_ssm_igmp_join_acl(): "Test SSM IGMP joins with extended ACLs" logger.info("Send IGMP joins from r2 to r1 with ACL enabled and disabled") tgen = get_topogen() if tgen.routers_have_failure(): pytest.skip(tgen.errors) r3 = tgen.gears["r3"] r2 = tgen.gears["r2"] r1 = tgen.gears["r1"] # No IGMP sources other than from self for AutoRP Discovery group initially expected = { "r1-eth0":{ "name":"r1-eth0", "224.0.1.40":"*", "229.1.1.1":None, "232.1.1.1":None }, "r1-eth2":{ "name":"r1-eth2", "224.0.1.40":"*", "229.1.1.1":None, "232.1.1.1":None } } test_func = partial( topotest.router_json_cmp, r1, "show ip igmp sources json", {} ) _, result = topotest.run_and_expect(test_func, None, count=20, wait=1) assert result is None, "Expected no IGMP sources other than from AutoRP Discovery" # Send IGMP join from r2, check if r1 has IGMP source r2.vtysh_cmd(( """ configure terminal interface r2-eth0 ip igmp join {} 10.0.20.2 """ ).format(SSM_GROUP)) expected = { "r1-eth0":{ "name":"r1-eth0", "232.1.1.1":{ "group":"232.1.1.1", "sources":[ { "source":"10.0.20.2", "timer":"*", "forwarded":False, "uptime":"*" } ] } } } test_func = partial( topotest.router_json_cmp, r1, "show ip igmp sources json", expected ) _, result = topotest.run_and_expect(test_func, None, count=20, wait=1) assert result is None, "Expected IGMP source to be present but is absent" # Test inbound boundary on r1 # Enable multicast boundary on r1, toggle IGMP join on r2 r2.vtysh_cmd(( """ configure terminal interface r2-eth0 no ip igmp join {} 10.0.20.2 """ ).format(SSM_GROUP)) r1.vtysh_cmd( """ configure terminal interface r1-eth0 ip multicast boundary pim-acl """ ) r2.vtysh_cmd(( """ configure terminal interface r2-eth0 ip igmp join {} 10.0.20.2 """ ).format(SSM_GROUP)) expected = { "r1-eth0":{ "name":"r1-eth0", "232.1.1.1":None } } test_func = partial( topotest.router_json_cmp, r1, "show ip igmp sources json", expected ) _, result = topotest.run_and_expect(test_func, None, count=20, wait=1) assert result is None, "Expected IGMP source to be absent but is present" # Add lower, more-specific permit rule to access-list r2.vtysh_cmd(( """ configure terminal interface r2-eth0 no ip igmp join {} 10.0.20.2 """ ).format(SSM_GROUP)) r1.vtysh_cmd(( """ configure terminal access-list pim-acl seq 5 permit ip host 10.0.20.2 {} 0.0.0.128 """ ).format(SSM_GROUP)) r2.vtysh_cmd(( """ configure terminal interface r2-eth0 ip igmp join {} 10.0.20.2 """ ).format(SSM_GROUP)) expected = { "r1-eth0":{ "name":"r1-eth0", "232.1.1.1":{ "group":"232.1.1.1", "sources":[ { "source":"10.0.20.2", "timer":"*", "forwarded":False, "uptime":"*" } ] } } } test_func = partial( topotest.router_json_cmp, r1, "show ip igmp sources json", expected ) _, result = topotest.run_and_expect(test_func, None, count=20, wait=1) assert result is None, "Expected IGMP source to be present but is absent" # Test outbound boundary on r2 # Enable multicast boundary on r2, toggle IGMP join (test outbound) expected = { "vrf":"default", "r2-eth0":{ "name":"r2-eth0", "groups":[ { "source":"10.0.20.2", "group":"232.1.1.1", "primaryAddr":"10.0.20.2", "sockFd":"*", "upTime":"*" } ] } } test_func = partial( topotest.router_json_cmp, r2, "show ip igmp join json", expected ) _, result = topotest.run_and_expect(test_func, None, count=20, wait=1) assert result is None, "Expected IGMP join to be present but is absent" # Enable boundary ACL, check join is absent r2.vtysh_cmd(( """ configure terminal interface r2-eth0 no ip igmp join {} 10.0.20.2 ip multicast boundary pim-acl ip igmp join {} 10.0.20.2 """ ).format(SSM_GROUP, SSM_GROUP)) expected = { "vrf":"default", "r2-eth0":None } test_func = partial( topotest.router_json_cmp, r2, "show ip igmp join json", expected ) _, result = topotest.run_and_expect(test_func, None, count=20, wait=1) assert result is None, "Expected IGMP join to be absent but is present" # Check sources on r1 again, should be absent even though we permitted it because r2 is blocking it outbound expected = { "r1-eth0":{ "name":"r1-eth0", "232.1.1.1":None }, "r1-eth2":{ "name":"r1-eth2", "232.1.1.1":None } } test_func = partial( topotest.router_json_cmp, r1, "show ip igmp sources json", expected ) _, result = topotest.run_and_expect(test_func, None, count=20, wait=1) assert result is None, "Expected IGMP source to be absent but is present" # Send IGMP join from r3 with different source, should show up on r1 # Add lower, more-specific permit rule to access-list r3.vtysh_cmd(( """ configure terminal interface r3-eth0 ip igmp join {} 10.0.40.4 """ ).format(SSM_GROUP)) expected = { "r1-eth0":{ "name":"r1-eth0", "232.1.1.1":None }, "r1-eth2":{ "name":"r1-eth2", "232.1.1.1":{ "group":"232.1.1.1", "sources":[ { "source":"10.0.40.4", "timer":"*", "forwarded":False, "uptime":"*" } ] } } } test_func = partial( topotest.router_json_cmp, r1, "show ip igmp sources json", expected ) _, result = topotest.run_and_expect(test_func, None, count=20, wait=1) assert result is None, "Expected IGMP source to be present but is absent" # PIM join # PIM-DM forwarding def test_memory_leak(): "Run the memory leak test and report results." tgen = get_topogen() if not tgen.is_memleak_enabled(): pytest.skip("Memory leak test/report is disabled") tgen.report_memory_leaks() if __name__ == "__main__": args = ["-s"] + sys.argv[1:] sys.exit(pytest.main(args))