# # Copyright (c) 2020 by VMware, Inc. ("VMware") # Used Copyright (c) 2018 by Network Device Education Foundation, Inc. # ("NetDEF") in this file. # # Permission to use, copy, modify, and/or distribute this software # for any purpose with or without fee is hereby granted, provided # that the above copyright notice and this permission notice appear # in all copies. # # THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY # DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, # WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS # ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE # OF THIS SOFTWARE. # from copy import deepcopy import traceback from time import sleep from lib.topolog import logger import ipaddr # Import common_config to use commomnly used APIs from lib.common_config import (create_common_configuration, InvalidCLIError, retry, generate_ips, check_address_types, validate_ip_address, run_frr_cmd) LOGDIR = "/tmp/topotests/" TMPDIR = None ################################ # Configure procs ################################ def create_router_ospf( tgen, topo, input_dict=None, build=False, load_config=True): """ API to configure ospf on router. Parameters ---------- * `tgen` : Topogen object * `topo` : json file data * `input_dict` : Input dict data, required when configuring from testcase * `build` : Only for initial setup phase this is set as True. * `load_config` : Loading the config to router this is set as True. Usage ----- input_dict = { "r1": { "ospf": { "router_id": "22.22.22.22", "area": [{ "id":0.0.0.0, "type": "nssa"}] } } result = create_router_ospf(tgen, topo, input_dict) Returns ------- True or False """ logger.debug("Entering lib API: create_router_ospf()") result = False if not input_dict: input_dict = deepcopy(topo) else: topo = topo["routers"] input_dict = deepcopy(input_dict) for router in input_dict.keys(): if "ospf" not in input_dict[router]: logger.debug("Router %s: 'ospf' not present in input_dict", router) continue result = __create_ospf_global( tgen, input_dict, router, build, load_config) if result is True: ospf_data = input_dict[router]["ospf"] logger.debug("Exiting lib API: create_router_ospf()") return result def __create_ospf_global( tgen, input_dict, router, build=False, load_config=True): """ Helper API to create ospf global configuration. Parameters ---------- * `tgen` : Topogen object * `input_dict` : Input dict data, required when configuring from testcase * `router` : router to be configured. * `build` : Only for initial setup phase this is set as True. * `load_config` : Loading the config to router this is set as True. Returns ------- True or False """ result = False logger.debug("Entering lib API: __create_ospf_global()") try: ospf_data = input_dict[router]["ospf"] del_ospf_action = ospf_data.setdefault("delete", False) if del_ospf_action: config_data = ["no router ospf"] result = create_common_configuration(tgen, router, config_data, "ospf", build, load_config) return result config_data = [] cmd = "router ospf" config_data.append(cmd) # router id router_id = ospf_data.setdefault("router_id", None) del_router_id = ospf_data.setdefault("del_router_id", False) if del_router_id: config_data.append("no ospf router-id") if router_id: config_data.append("ospf router-id {}".format( router_id)) # redistribute command redistribute_data = ospf_data.setdefault("redistribute", {}) if redistribute_data: for redistribute in redistribute_data: if "redist_type" not in redistribute: logger.debug("Router %s: 'redist_type' not present in " "input_dict", router) else: cmd = "redistribute {}".format( redistribute["redist_type"]) for red_type in redistribute_data: if "route_map" in red_type: cmd = cmd + " route-map {}".format(red_type[ 'route_map']) del_action = redistribute.setdefault("delete", False) if del_action: cmd = "no {}".format(cmd) config_data.append(cmd) #area information area_data = ospf_data.setdefault("area", {}) if area_data: for area in area_data: if "id" not in area: logger.debug("Router %s: 'area id' not present in " "input_dict", router) else: cmd = "area {}".format(area["id"]) if "type" in area: cmd = cmd + " {}".format(area["type"]) del_action = area.setdefault("delete", False) if del_action: cmd = "no {}".format(cmd) config_data.append(cmd) result = create_common_configuration(tgen, router, config_data, "ospf", build, load_config) # summary information summary_data = ospf_data.setdefault("summary-address", {}) if summary_data: for summary in summary_data: if "prefix" not in summary: logger.debug("Router %s: 'summary-address' not present in " "input_dict", router) else: cmd = "summary {}/{}".format(summary["prefix"], summary[ "mask"]) _tag = summary.setdefault("tag", None) if _tag: cmd = "{} tag {}".format(cmd, _tag) _advertise = summary.setdefault("advertise", True) if not _advertise: cmd = "{} no-advertise".format(cmd) del_action = summary.setdefault("delete", False) if del_action: cmd = "no {}".format(cmd) config_data.append(cmd) result = create_common_configuration(tgen, router, config_data, "ospf", build, load_config) except InvalidCLIError: # Traceback errormsg = traceback.format_exc() logger.error(errormsg) return errormsg logger.debug("Exiting lib API: create_ospf_global()") return result def create_router_ospf6( tgen, topo, input_dict=None, build=False, load_config=True): """ API to configure ospf on router Parameters ---------- * `tgen` : Topogen object * `topo` : json file data * `input_dict` : Input dict data, required when configuring from testcase * `build` : Only for initial setup phase this is set as True. Usage ----- input_dict = { "r1": { "ospf6": { "router_id": "22.22.22.22", } } Returns ------- True or False """ logger.debug("Entering lib API: create_router_ospf()") result = False if not input_dict: input_dict = deepcopy(topo) else: topo = topo["routers"] input_dict = deepcopy(input_dict) for router in input_dict.keys(): if "ospf" not in input_dict[router]: logger.debug("Router %s: 'ospf' not present in input_dict", router) continue result = __create_ospf_global( tgen, input_dict, router, build, load_config) logger.debug("Exiting lib API: create_router_ospf()") return result def __create_ospf6_global( tgen, input_dict, router, build=False, load_config=True): """ Helper API to create ospf global configuration. Parameters ---------- * `tgen` : Topogen object * `input_dict` : Input dict data, required when configuring from testcase * `router` : router id to be configured. * `build` : Only for initial setup phase this is set as True. Returns ------- True or False """ result = False logger.debug("Entering lib API: __create_ospf_global()") try: ospf_data = input_dict[router]["ospf6"] del_ospf_action = ospf_data.setdefault("delete", False) if del_ospf_action: config_data = ["no ipv6 router ospf"] result = create_common_configuration(tgen, router, config_data, "ospf", build, load_config) return result config_data = [] cmd = "router ospf" config_data.append(cmd) router_id = ospf_data.setdefault("router_id", None) del_router_id = ospf_data.setdefault("del_router_id", False) if del_router_id: config_data.append("no ospf router-id") if router_id: config_data.append("ospf router-id {}".format( router_id)) result = create_common_configuration(tgen, router, config_data, "ospf", build, load_config) except InvalidCLIError: # Traceback errormsg = traceback.format_exc() logger.error(errormsg) return errormsg logger.debug("Exiting lib API: create_ospf_global()") return result def config_ospf_interface (tgen, topo, input_dict=None, build=False, load_config=True): """ API to configure ospf on router. Parameters ---------- * `tgen` : Topogen object * `topo` : json file data * `input_dict` : Input dict data, required when configuring from testcase * `build` : Only for initial setup phase this is set as True. * `load_config` : Loading the config to router this is set as True. Usage ----- r1_ospf_auth = { "r1": { "links": { "r2": { "ospf": { "authentication": 'message-digest', "authentication-key": "ospf", "message-digest-key": "10" } } } } } result = config_ospf_interface(tgen, topo, r1_ospf_auth) Returns ------- True or False """ logger.debug("Enter lib config_ospf_interface") if not input_dict: input_dict = deepcopy(topo) else: input_dict = deepcopy(input_dict) for router in input_dict.keys(): config_data = [] for lnk in input_dict[router]['links'].keys(): if "ospf" not in input_dict[router]['links'][lnk]: logger.debug("Router %s: ospf configs is not present in" "input_dict, passed input_dict", router, input_dict) continue ospf_data = input_dict[router]['links'][lnk]['ospf'] data_ospf_area = ospf_data.setdefault("area", None) data_ospf_auth = ospf_data.setdefault("authentication", None) data_ospf_dr_priority = ospf_data.setdefault("priority", None) data_ospf_cost = ospf_data.setdefault("cost", None) try: intf = topo['routers'][router]['links'][lnk]['interface'] except KeyError: intf = topo['switches'][router]['links'][lnk]['interface'] # interface cmd = "interface {}".format(intf) config_data.append(cmd) # interface area config if data_ospf_area: cmd = "ip ospf area {}".format(data_ospf_area) config_data.append(cmd) # interface ospf auth if data_ospf_auth: if data_ospf_auth == 'null': cmd = "ip ospf authentication null" elif data_ospf_auth == 'message-digest': cmd = "ip ospf authentication message-digest" else: cmd = "ip ospf authentication" if 'del_action' in ospf_data: cmd = "no {}".format(cmd) config_data.append(cmd) if "message-digest-key" in ospf_data: cmd = "ip ospf message-digest-key {} md5 {}".format( ospf_data["message-digest-key"],ospf_data[ "authentication-key"]) if 'del_action' in ospf_data: cmd = "no {}".format(cmd) config_data.append(cmd) if "authentication-key" in ospf_data and \ "message-digest-key" not in ospf_data: cmd = "ip ospf authentication-key {}".format(ospf_data[ "authentication-key"]) if 'del_action' in ospf_data: cmd = "no {}".format(cmd) config_data.append(cmd) # interface ospf dr priority if data_ospf_dr_priority in ospf_data: cmd = "ip ospf priority {}".format( ospf_data["priority"]) if 'del_action' in ospf_data: cmd = "no {}".format(cmd) config_data.append(cmd) # interface ospf cost if data_ospf_cost in ospf_data: cmd = "ip ospf cost {}".format( ospf_data["cost"]) if 'del_action' in ospf_data: cmd = "no {}".format(cmd) config_data.append(cmd) if build: return config_data else: result = create_common_configuration(tgen, router, config_data, "interface_config", build=build) logger.debug("Exiting lib API: create_igmp_config()") return result def clear_ospf(tgen, router): """ This API is to clear ospf neighborship by running clear ip ospf interface * command, Parameters ---------- * `tgen`: topogen object * `router`: device under test Usage ----- clear_ospf(tgen, "r1") """ logger.debug("Entering lib API: clear_ospf()") if router not in tgen.routers(): return False rnode = tgen.routers()[router] # Clearing OSPF logger.info("Clearing ospf process for router %s..", router) run_frr_cmd(rnode, "clear ip ospf interface ") logger.debug("Exiting lib API: clear_ospf()") ################################ # Verification procs ################################ @retry(attempts=40, wait=2, return_is_str=True) def verify_ospf_neighbor(tgen, topo, dut=None, input_dict=None, lan=False): """ This API is to verify ospf neighborship by running show ip ospf neighbour command, Parameters ---------- * `tgen` : Topogen object * `topo` : json file data * `dut`: device under test * `input_dict` : Input dict data, required when configuring from testcase * `lan` : verify neighbors in lan topology Usage ----- 1. To check FULL neighbors. verify_ospf_neighbor(tgen, topo, dut=dut) 2. To check neighbors with their roles. input_dict = { "r0": { "ospf": { "neighbors": { "r1": { "state": "Full", "role": "DR" }, "r2": { "state": "Full", "role": "DROther" }, "r3": { "state": "Full", "role": "DROther" } } } } } result = verify_ospf_neighbor(tgen, topo, dut, input_dict, lan=True) Returns ------- True or False (Error Message) """ logger.debug("Entering lib API: verify_ospf_neighbor()") result = False if input_dict: for router, rnode in tgen.routers().iteritems(): if 'ospf' not in topo['routers'][router]: continue if dut is not None and dut != router: continue logger.info("Verifying OSPF neighborship on router %s:", router) show_ospf_json = run_frr_cmd(rnode, "show ip ospf neighbor all json", isjson=True) # Verifying output dictionary show_ospf_json is empty or not if not bool(show_ospf_json): errormsg = "OSPF is not running" return errormsg ospf_data_list = input_dict[router]["ospf"] ospf_nbr_list = ospf_data_list['neighbors'] for ospf_nbr, nbr_data in ospf_nbr_list.items(): data_ip = topo['routers'][ospf_nbr]['links'] data_rid = topo['routers'][ospf_nbr]['ospf']['router_id'] if ospf_nbr in data_ip: nbr_details = nbr_data[ospf_nbr] elif lan: for switch in topo['switches']: if 'ospf' in topo['switches'][switch]['links'][router]: neighbor_ip = data_ip[switch]['ipv4'].split("/")[0] else: continue else: neighbor_ip = data_ip[router]['ipv4'].split("/")[0] nh_state = None neighbor_ip = neighbor_ip.lower() nbr_rid = data_rid try: nh_state = show_ospf_json[nbr_rid][0][ 'state'].split('/')[0] intf_state = show_ospf_json[nbr_rid][0][ 'state'].split('/')[1] except KeyError: errormsg = "[DUT: {}] OSPF peer {} missing".format(router, nbr_rid) return errormsg nbr_state = nbr_data.setdefault("state",None) nbr_role = nbr_data.setdefault("role",None) if nbr_state: if nbr_state == nh_state: logger.info("[DUT: {}] OSPF Nbr is {}:{} State {}".format (router, ospf_nbr, nbr_rid, nh_state)) result = True else: errormsg = ("[DUT: {}] OSPF is not Converged, neighbor" " state is {}".format(router, nh_state)) return errormsg if nbr_role: if nbr_role == intf_state: logger.info("[DUT: {}] OSPF Nbr is {}: {} Role {}".format( router, ospf_nbr, nbr_rid, nbr_role)) else: errormsg = ("[DUT: {}] OSPF is not Converged with rid" "{}, role is {}".format(router, nbr_rid, intf_state)) return errormsg continue else: for router, rnode in tgen.routers().iteritems(): if 'ospf' not in topo['routers'][router]: continue if dut is not None and dut != router: continue logger.info("Verifying OSPF neighborship on router %s:", router) show_ospf_json = run_frr_cmd(rnode, "show ip ospf neighbor all json", isjson=True) # Verifying output dictionary show_ospf_json is empty or not if not bool(show_ospf_json): errormsg = "OSPF is not running" return errormsg ospf_data_list = topo["routers"][router]["ospf"] ospf_neighbors = ospf_data_list['neighbors'] total_peer = 0 total_peer = len(ospf_neighbors.keys()) no_of_ospf_nbr = 0 ospf_nbr_list = ospf_data_list['neighbors'] no_of_peer = 0 for ospf_nbr, nbr_data in ospf_nbr_list.items(): if nbr_data: data_ip = topo['routers'][nbr_data["nbr"]]['links'] data_rid = topo['routers'][nbr_data["nbr"]][ 'ospf']['router_id'] else: data_ip = topo['routers'][ospf_nbr]['links'] data_rid = topo['routers'][ospf_nbr]['ospf']['router_id'] if ospf_nbr in data_ip: nbr_details = nbr_data[ospf_nbr] elif lan: for switch in topo['switches']: if 'ospf' in topo['switches'][switch]['links'][router]: neighbor_ip = data_ip[switch]['ipv4'].split("/")[0] else: continue else: neighbor_ip = data_ip[router]['ipv4'].split("/")[0] nh_state = None neighbor_ip = neighbor_ip.lower() nbr_rid = data_rid try: nh_state = show_ospf_json[nbr_rid][0][ 'state'].split('/')[0] except KeyError: errormsg = "[DUT: {}] OSPF peer {} missing,from "\ "{} ".format(router, nbr_rid, ospf_nbr) return errormsg if nh_state == 'Full': no_of_peer += 1 if no_of_peer == total_peer: logger.info("[DUT: {}] OSPF is Converged".format(router)) result = True else: errormsg = ("[DUT: {}] OSPF is not Converged".format(router)) return errormsg logger.debug("Exiting API: verify_ospf_neighbor()") return result @retry(attempts=20, wait=2, return_is_str=True, initial_wait=2) def verify_ospf_rib(tgen, dut, input_dict, next_hop=None, tag=None, metric=None, fib=None): """ This API is to verify ospf routes by running show ip ospf route command. Parameters ---------- * `tgen` : Topogen object * `dut`: device under test * `input_dict` : Input dict data, required when configuring from testcase * `next_hop` : next to be verified * `tag` : tag to be verified * `metric` : metric to be verified * `fib` : True if the route is installed in FIB. Usage ----- input_dict = { "r1": { "static_routes": [ { "network": ip_net, "no_of_ip": 1, "routeType": "N" } ] } } result = verify_ospf_rib(tgen, dut, input_dict,next_hop=nh) Returns ------- True or False (Error Message) """ logger.info("Entering lib API: verify_ospf_rib()") result = False router_list = tgen.routers() additional_nexthops_in_required_nhs = [] found_hops = [] for routerInput in input_dict.keys(): for router, rnode in router_list.iteritems(): if router != dut: continue logger.info("Checking router %s RIB:", router) # Verifying RIB routes command = "show ip ospf route" found_routes = [] missing_routes = [] if "static_routes" in input_dict[routerInput] or \ "prefix" in input_dict[routerInput]: if "prefix" in input_dict[routerInput]: static_routes = input_dict[routerInput]["prefix"] else: static_routes = input_dict[routerInput]["static_routes"] for static_route in static_routes: cmd = "{}".format(command) cmd = "{} json".format(cmd) ospf_rib_json = run_frr_cmd(rnode, cmd, isjson=True) # Verifying output dictionary ospf_rib_json is not empty if bool(ospf_rib_json) is False: errormsg = "[DUT: {}] No routes found in OSPF route " \ "table".format(router) return errormsg network = static_route["network"] no_of_ip = static_route.setdefault("no_of_ip", 1) _tag = static_route.setdefault("tag", None) _rtype = static_route.setdefault("routeType", None) # Generating IPs for verification ip_list = generate_ips(network, no_of_ip) st_found = False nh_found = False for st_rt in ip_list: st_rt = str(ipaddr.IPNetwork(unicode(st_rt))) _addr_type = validate_ip_address(st_rt) if _addr_type != 'ipv4': continue if st_rt in ospf_rib_json: st_found = True found_routes.append(st_rt) if fib and next_hop: if type(next_hop) is not list: next_hop = [next_hop] for mnh in range(0, len(ospf_rib_json[st_rt])): if 'fib' in ospf_rib_json[st_rt][ mnh]["nexthops"][0]: found_hops.append([rib_r[ "ip"] for rib_r in ospf_rib_json[ st_rt][mnh]["nexthops"]]) if found_hops[0]: missing_list_of_nexthops = \ set(found_hops[0]).difference(next_hop) additional_nexthops_in_required_nhs = \ set(next_hop).difference(found_hops[0]) if additional_nexthops_in_required_nhs: logger.info( "Nexthop " "%s is not active for route %s in " "RIB of router %s\n", additional_nexthops_in_required_nhs, st_rt, dut) errormsg = ( "Nexthop {} is not active" " for route {} in RIB of router" " {}\n".format( additional_nexthops_in_required_nhs, st_rt, dut)) return errormsg else: nh_found = True elif next_hop and fib is None: if type(next_hop) is not list: next_hop = [next_hop] found_hops = [rib_r["ip"] for rib_r in ospf_rib_json[st_rt][ "nexthops"]] if found_hops: missing_list_of_nexthops = \ set(found_hops).difference(next_hop) additional_nexthops_in_required_nhs = \ set(next_hop).difference(found_hops) if additional_nexthops_in_required_nhs: logger.info( "Missing nexthop %s for route"\ " %s in RIB of router %s\n", \ additional_nexthops_in_required_nhs, \ st_rt, dut) errormsg=("Nexthop {} is Missing for "\ "route {} in RIB of router {}\n".format( additional_nexthops_in_required_nhs, st_rt, dut)) return errormsg else: nh_found = True if _rtype: if "routeType" not in ospf_rib_json[ st_rt]: errormsg = ("[DUT: {}]: routeType missing" "for route {} in OSPF RIB \n".\ format(dut, st_rt)) return errormsg elif _rtype != ospf_rib_json[st_rt][ "routeType"]: errormsg = ("[DUT: {}]: routeType mismatch" "for route {} in OSPF RIB \n".\ format(dut, st_rt)) return errormsg else: logger.info("DUT: {}]: Found routeType {}" "for route {}".\ format(dut, _rtype, st_rt)) if tag: if "tag" not in ospf_rib_json[ st_rt]: errormsg = ("[DUT: {}]: tag is not" " present for" " route {} in RIB \n".\ format(dut, st_rt )) return errormsg if _tag != ospf_rib_json[ st_rt]["tag"]: errormsg = ("[DUT: {}]: tag value {}" " is not matched for" " route {} in RIB \n".\ format(dut, _tag, st_rt, )) return errormsg if metric is not None: if "type2cost" not in ospf_rib_json[ st_rt]: errormsg = ("[DUT: {}]: metric is" " not present for" " route {} in RIB \n".\ format(dut, st_rt)) return errormsg if metric != ospf_rib_json[ st_rt]["type2cost"]: errormsg = ("[DUT: {}]: metric value " "{} is not matched for " "route {} in RIB \n".\ format(dut, metric, st_rt, )) return errormsg else: missing_routes.append(st_rt) if nh_found: logger.info("[DUT: {}]: Found next_hop {} for all OSPF" " routes in RIB".format(router, next_hop)) if len(missing_routes) > 0: errormsg = ("[DUT: {}]: Missing route in RIB, " "routes: {}".\ format(dut, missing_routes)) return errormsg if found_routes: logger.info("[DUT: %s]: Verified routes in RIB, found" " routes are: %s\n", dut, found_routes) result = True logger.info("Exiting lib API: verify_ospf_rib()") return result @retry(attempts=10, wait=2, return_is_str=True) def verify_ospf_interface(tgen, topo, dut=None,lan=False, input_dict=None): """ This API is to verify ospf routes by running show ip ospf interface command. Parameters ---------- * `tgen` : Topogen object * `topo` : topology descriptions * `dut`: device under test * `lan`: if set to true this interface belongs to LAN. * `input_dict` : Input dict data, required when configuring from testcase Usage ----- input_dict= { 'r0': { 'links':{ 's1': { 'ospf':{ 'priority':98, 'timerDeadSecs': 4, 'area': '0.0.0.3', 'mcastMemberOspfDesignatedRouters': True, 'mcastMemberOspfAllRouters': True, 'ospfEnabled': True, } } } } } result = verify_ospf_interface(tgen, topo, dut=dut, input_dict=input_dict) Returns ------- True or False (Error Message) """ logger.debug("Entering lib API: verify_ospf_interface()") result = False for router, rnode in tgen.routers().iteritems(): if 'ospf' not in topo['routers'][router]: continue if dut is not None and dut != router: continue logger.info("Verifying OSPF interface on router %s:", router) show_ospf_json = run_frr_cmd(rnode, "show ip ospf interface json", isjson=True) # Verifying output dictionary show_ospf_json is empty or not if not bool(show_ospf_json): errormsg = "OSPF is not running" return errormsg # To find neighbor ip type ospf_intf_data = input_dict[router]["links"] for ospf_intf, intf_data in ospf_intf_data.items(): intf = topo['routers'][router]['links'][ospf_intf]['interface'] if intf in show_ospf_json['interfaces']: for intf_attribute in intf_data['ospf']: if intf_data['ospf'][intf_attribute] == show_ospf_json[ 'interfaces'][intf][intf_attribute]: logger.info("[DUT: %s] OSPF interface %s: %s is %s", router, intf, intf_attribute, intf_data['ospf'][ intf_attribute]) else: errormsg= "[DUT: {}] OSPF interface {}: {} is {}, \ Expected is {}".format(router, intf, intf_attribute, intf_data['ospf'][intf_attribute], show_ospf_json[ 'interfaces'][intf][intf_attribute]) return errormsg result = True logger.debug("Exiting API: verify_ospf_interface()") return result @retry(attempts=10, wait=2, return_is_str=True, initial_wait=2) def verify_ospf_database(tgen, topo, dut, input_dict): """ This API is to verify ospf lsa's by running show ip ospf database command. Parameters ---------- * `tgen` : Topogen object * `dut`: device under test * `input_dict` : Input dict data, required when configuring from testcase * `topo` : next to be verified Usage ----- input_dict = { "areas": { "0.0.0.0": { "Router Link States": { "100.1.1.0-100.1.1.0": { "LSID": "100.1.1.0", "Advertised router": "100.1.1.0", "LSA Age": 130, "Sequence Number": "80000006", "Checksum": "a703", "Router links": 3 } }, "Net Link States": { "10.0.0.2-100.1.1.1": { "LSID": "10.0.0.2", "Advertised router": "100.1.1.1", "LSA Age": 137, "Sequence Number": "80000001", "Checksum": "9583" } }, }, } } result = verify_ospf_database(tgen, topo, dut, input_dict) Returns ------- True or False (Error Message) """ result = False router = dut logger.debug("Entering lib API: verify_ospf_database()") if 'ospf' not in topo['routers'][dut]: errormsg = "[DUT: {}] OSPF is not configured on the router.".format( dut) return errormsg rnode = tgen.routers()[dut] logger.info("Verifying OSPF interface on router %s:", dut) show_ospf_json = run_frr_cmd(rnode, "show ip ospf database json", isjson=True) # Verifying output dictionary show_ospf_json is empty or not if not bool(show_ospf_json): errormsg = "OSPF is not running" return errormsg # for inter and inter lsa's ospf_db_data = input_dict.setdefault("areas", None) ospf_external_lsa = input_dict.setdefault( 'AS External Link States', None) if ospf_db_data: for ospf_area, area_lsa in ospf_db_data.items(): if ospf_area in show_ospf_json['areas']: if 'Router Link States' in area_lsa: for lsa in area_lsa['Router Link States']: if lsa in show_ospf_json['areas'][ospf_area][ 'Router Link States']: logger.info( "[DUT: %s] OSPF LSDB area %s:Router " "LSA %s", router, ospf_area, lsa) result = True else: errormsg = \ "[DUT: {}] OSPF LSDB area {}: expected" \ " Router LSA is {}".format(router, ospf_area, lsa) return errormsg if 'Net Link States' in area_lsa: for lsa in area_lsa['Net Link States']: if lsa in show_ospf_json['areas'][ospf_area][ 'Net Link States']: logger.info( "[DUT: %s] OSPF LSDB area %s:Network " "LSA %s", router, ospf_area, lsa) result = True else: errormsg = \ "[DUT: {}] OSPF LSDB area {}: expected" \ " Network LSA is {}".format(router, ospf_area, lsa) return errormsg if 'Summary Link States' in area_lsa: for lsa in area_lsa['Summary Link States']: if lsa in show_ospf_json['areas'][ospf_area][ 'Summary Link States']: logger.info( "[DUT: %s] OSPF LSDB area %s:Summary " "LSA %s", router, ospf_area, lsa) result = True else: errormsg = \ "[DUT: {}] OSPF LSDB area {}: expected" \ " Summary LSA is {}".format(router, ospf_area, lsa) return errormsg if 'ASBR-Summary Link States' in area_lsa: for lsa in area_lsa['ASBR-Summary Link States']: if lsa in show_ospf_json['areas'][ospf_area][ 'ASBR-Summary Link States']: logger.info( "[DUT: %s] OSPF LSDB area %s:ASBR Summary " "LSA %s", router, ospf_area, lsa) result = True else: errormsg = \ "[DUT: {}] OSPF LSDB area {}: expected" \ " ASBR Summary LSA is {}".format( router, ospf_area, lsa) return errormsg if ospf_external_lsa: for ospf_ext_lsa, ext_lsa_data in ospf_external_lsa.items(): if ospf_ext_lsa in show_ospf_json['AS External Link States']: logger.info( "[DUT: %s] OSPF LSDB:External LSA %s", router, ospf_ext_lsa) result = True else: errormsg = \ "[DUT: {}] OSPF LSDB : expected" \ " External LSA is {}".format(router, ospf_ext_lsa) return errormsg logger.debug("Exiting API: verify_ospf_database()") return result @retry(attempts=10, wait=2, return_is_str=True) def verify_ospf_summary(tgen, topo, dut, input_dict): """ This API is to verify ospf routes by running show ip ospf interface command. Parameters ---------- * `tgen` : Topogen object * `topo` : topology descriptions * `dut`: device under test * `input_dict` : Input dict data, required when configuring from testcase Usage ----- input_dict = { "11.0.0.0/8": { "Summary address": "11.0.0.0/8", "Metric-type": "E2", "Metric": 20, "Tag": 0, "External route count": 5 } } result = verify_ospf_summary(tgen, topo, dut, input_dict) Returns ------- True or False (Error Message) """ logger.debug("Entering lib API: verify_ospf_summary()") result = False router = dut logger.info("Verifying OSPF summary on router %s:", router) if 'ospf' not in topo['routers'][dut]: errormsg = "[DUT: {}] OSPF is not configured on the router.".format( router) return errormsg rnode = tgen.routers()[dut] show_ospf_json = run_frr_cmd(rnode, "show ip ospf summary detail json", isjson=True) # Verifying output dictionary show_ospf_json is empty or not if not bool(show_ospf_json): errormsg = "OSPF is not running" return errormsg # To find neighbor ip type ospf_summary_data = input_dict for ospf_summ, summ_data in ospf_summary_data.items(): if ospf_summ not in show_ospf_json: continue summary = ospf_summary_data[ospf_summ]['Summary address'] if summary in show_ospf_json: for summ in summ_data: if summ_data[summ] == show_ospf_json[summary][summ]: logger.info("[DUT: %s] OSPF summary %s:%s is %s", router, summary, summ, summ_data[summ]) result = True else: errormsg = ("[DUT: {}] OSPF summary {}:{} is %s, " "Expected is {}".format(router,summary, summ, show_ospf_json[summary][summ])) return errormsg logger.debug("Exiting API: verify_ospf_summary()") return result