进阶教程
自定义厂商
目前 net_inspect 支持以下厂商的识别:
Ciscocisco_iosHuaweihuawei_vrpH3Chp_comwareMaipumaipu_mypowerRuijieruijie_os
但是在 ntc-templates-elinpf 中包含的的远远不止这5家厂商,
当有需要时,可以自定义厂商,只需要继承 DefaultVendor 类即可。
例如,我们需要识别一台 Juniper 的设备,我们可以这样做:
from net_inspect.vendor import DefaultVendor
class Juniper(DefaultVendor):
PLATFORM = 'juniper_junos'
VERSION_COMMAND = 'sh[[ow]] ver[[sion]]'
KEYWORD_REG = r'JUNOS'
INVALID_STR = r'Invalid input detected at'
PLATFORM表示在ntc-templates-elinpf中的厂商名称前缀VERSION_COMMAND表示获取设备版本的命令,[[和]]包裹的内容是可以简写的,例如sh ver和sh[[ow]] ver[[sion]]是等价的KEYWORD_REG表示设备厂商信息的关键字,可以使用正则匹配,匹配到关键字后识别为对应厂商设备INVALID_STR表示命令回显是无效回显的关键字,必须命令错误时的回显等。
只需要这样定义一下, net_inspect 在执行的时候会自动识别出来。并将对应的 juniper_junos 模板调用执行解析。
自定义输入模块
由于各个网络管理平台平台或者自动化工具输出的日志格式不尽相同, 所以 net_inspect 提供了自定义输入模块的功能,可以自定义输入模块,实现自己的信息解析逻辑。
以一个网管平台的采集信息为例,采集的 日志文件名 格式如下:
<hostname>_<ip>_<date>.diag
例如:
A_FOO_BAR_DR01_127.0.0.1_20220221180010.diag
而采集的 日志内容 格式如下:
-------------------------dis clock-------------------------
17:05:11 bj Mon 02/21/2022
Time Zone : bj add 08:00:00
-------------------------dis version-------------------------
H3C Comware Platform Software
Comware Software, Version 5.20, Release 2202
Copyright (c) 2004-2010 Hangzhou H3C Tech. Co., Ltd. All rights reserved.
H3C S5500-28C-EI uptime is 115 weeks, 1 day, 8 hours, 37 minutes
H3C S5500-28C-EI with 1 Processor
256M bytes SDRAM
32768K bytes Flash Memory
Hardware Version is REV.C
CPLD Version is 002
Bootrom Version is 609
[SubSlot 0] 24GE+4SFP Hardware Version is REV.C
我们需要做的事情就是将 hostname, ip(可选), 每个执行命令 和 执行命令回显 提取出来交给 net_inspect 处理。
我们需要定义一个 InputPlugin 类,继承自 InputPluginAbstract 类,实现 main 方法。
import os
import re
from net_inspect import InputPluginAbstract, InputPluginResult, NetInspect
from net_inspect.exception import InputFileTypeError
device_info_reg = r'(?P<hostname>\S+)\_(?P<ip>(?:\d+\.){3}(?:\d+)).*' # 日志文件名正则表达式
command_line_reg = (
r'^-------------------------(?P<cmd>[^-].*?)-------------------------$' # 命令行正则表达式
)
class CustomInputPlugin(InputPluginAbstract): # 继承了 InputPluginAbstract
def main(self, file_path: str, stream: str) -> InputPluginResult:
result = InputPluginResult() # 创建一个 InputPluginResult 对象
match = re.match(
device_info_reg, os.path.basename(file_path)
) # 获取文件名,然后匹配正则表达式
if not match: # 如果文件名不符合正则表达式
raise InputFileTypeError(file_path) # 抛出异常,给 net_inspect 处理
result.hostname = match.group('hostname') # 保存获取的设备名
result.ip = match.group('ip') # 保存获取设备的 IP, 可选, 如果没有也可以不设置
command = '' # 用于存储命令
content = [] # 用于存储命令输出内容
for line in stream.splitlines(): # 逐行处理
"""
逐行处理,如果匹配到命令行,就保存上一个命令的内容,然后清空内容,继续处理下一行
如果匹配到非命令行,就直接存储内容
"""
if re.match(command_line_reg, line): # 如果匹配到命令行
if command: # 当有命令的时候,说明是上一个命令的结尾,要保存
result.add_cmd(command, '\n'.join(content)) # 保存命令和内容到 result 中
command = re.match(command_line_reg, line).group('cmd') # 获取命令
content = [] # 清空内容
continue
content.append(line) # 匹配到非命令行,就直接存储内容
if content: # 最后一个命令由于没有保存,所以要单独保存一下
result.add_cmd('command', '\n'.join(content))
return result
if __name__ == '__main__':
net = NetInspect() # 实例化 NetInspect 对象
net.set_input_plugin(CustomInputPlugin) # 设置输入插件为自定义的插件
net.run(input_path='log_custom') # 执行文件夹内的解析
for device in net.cluster.devices:
clock = ''
for row in device.parse_result('dis clock'): # 获取 display clock 的解析结果
clock = f"{row['year']}-{row['month']}-{row['day']} {row['time']}"
print(
'[+]',
device.info.hostname,
device.info.vendor,
device.info.model,
'clock:',
clock,
)
输出结果如下:
[+] B_FOO_BAR_DS02 H3C S5500-28C-EI clock: 2022-02-21 17:05:11
从上面的示例中,可以看到,实现了 InputPlugin 的 main 方法。
其中 main() 方法的参数如下:
file_path输入的单个日志文件名称stream输入的日志文件内容
返回指定的是一个 InputPluginResult 类,需要做到:
设置设备名称
通过
add_cmd()方法添加每个执行的命令和返回的内容
最后将 InputPlugin 类注册到 net_inspect 中,即可使用。
net.set_input_plugin(CustomInputPlugin)
增加base_info条目
在 BaseInfo 中,net_inspect 定义了一些基本的信息,比如 hostname, ip, cpu_usage 等等, 实现跨设备类型的统一调用。
但是有时候还需要增加一些自定义的字段,方便后续统一调用。
例如,我们需要增加一个 clock 字段,用于获取设备的时钟信息,我们可以这样做:
from net_inspect import NetInspect, EachVendorDeviceInfo, BaseInfo, Device
class AppendClock(BaseInfo):
clock: str = '' # 时钟信息
class EachVendorWithClock(EachVendorDeviceInfo):
base_info_class = AppendClock # 设置base_info的类
def do_huawei_vrp_baseinfo_2(self, device: Device, info: AppendClock):
# 添加do_<vendor_platform>_baseinfo_<something>方法,可以自动运行
for row in device.parse_result('display clock'):
info.clock = f'{row["year"]}-{row["month"]}-{row["day"]} {row["time"]}'
def do_hp_comware_baseinfo_2(self, device: Device, info: AppendClock):
for row in device.parse_result('display clock'):
info.clock = f'{row["year"]}-{row["month"]}-{row["day"]} {row["time"]}'
# ... 可自行其他厂商的do_<vendor_platform>_baseinfo_<something>方法
if __name__ == '__main__':
net = NetInspect()
net.set_plugins(input_plugin='console')
net.set_base_info_handler(EachVendorWithClock) # 设置获取设备基本信息的处理类
net.run(input_path='logs')
print('total devices:', len(net.cluster.devices))
for device in net.cluster.devices:
info = device.info # type: AppendClock
print(' | '.join([info.hostname, info.clock]))
执行的输出结果如下:
total devices: 3
Switch_A | 2021-03-19 10:23:08
Switch_B | 2021-03-19 10:24:17
Switch_C | 2021-03-19 10:32:17
可以看到,我们增加 huawei_vrp 和 hp_comware 平台的 clock 信息,使用 device.info.clock 进行统一调用。
备注
没有实现 clock 的平台,将会以空字符串作为默认值。
增加分析条目
net_inspect 同样可以增加自定义的分析条目,会在 run_analysis() 执行的时候自动运行。
例如,我们需要增加一个检查OSPF邻居状态的分析模块
from __future__ import annotations
from typing import TYPE_CHECKING
from net_inspect import NetInspect, vendor
from net_inspect.analysis_plugin import analysis, AnalysisPluginAbc
if TYPE_CHECKING:
from net_inspect.analysis_plugin import TemplateInfo
from net_inspect.domain import AnalysisResult
class AnalysisPluginWithOSPFStatus(AnalysisPluginAbc):
"""OSPF status 状态不能为Init"""
@analysis.vendor(vendor.Huawei)
@analysis.template_key('huawei_vrp_display_ospf_peer_brief.textfsm', ['neighbor', 'state'])
def huawei_vrp(template: TemplateInfo, result: AnalysisResult):
"""华为状态检查"""
for row in template['display ospf peer brief']:
if row['state'].lower() == 'init':
result.add_warning(f'{row["neighbor"]} is in init state')
if __name__ == '__main__':
net = NetInspect()
net.set_plugins(input_plugin='console')
net.run(input_path='logs')
print('total devices:', len(net.cluster.devices))
for device in net.cluster.devices:
ospf_status = device.analysis_result.get('ospf status')
warning_list = []
for alarm in ospf_status:
if alarm.is_warning:
warning_list.append(alarm.message)
print(' | '.join([device.info.hostname, ', '.join(warning_list)]))
这样就得到了一个可重复利用的分析模块,后续在想要检查OSPF邻居状态的时候,只需要调用这个模块即可。
这里只添加了 huawei_vrp 这一个平台,其他平台增加方法同上。
编写这个需要注意:
@analysis.template_key中第一个参数是textfsm文件名称, 第二个是其中将要用到的变量。类方法不需要self这个关键字。
结果加入到result中即可,不需要返回值。
整个过程不用单独设置,所有信息会直接写入analysis全局变量中。
类注释和方法注释必须要写,因为会用到这个注释作为分析结果的标题。
自定义输出模块
对结果进行调用,可以直接以写脚本的方式,也可以自定义输出模块。
自定义输出模块的方式方便二次调用,比如写一个输出table的例子:
from net_inspect import NetInspect, OutputPluginAbstract
from rich.table import Table
from rich.console import Console
class Output(OutputPluginAbstract):
def main(self):
self.check_args('title') # 检查是否提供title参数
console = Console()
table = Table(
title=self.args.output_params.get('title'),
show_lines=False
)
columns = ['hostname', 'ip', 'model', 'version', 'power status']
for col in columns:
table.add_column(col, justify='center')
table.row_styles = ['green']
for device in self.args.devices:
info = device.info
table.add_row(
info.hostname,
info.ip,
info.model,
info.version,
'Abnormal' if info.analysis.power else 'Normal'
)
console.print(table)
if __name__ == '__main__':
net = NetInspect()
net.set_plugins(input_plugin='console', output_plugin=Output)
cluster = net.run(input_path='log', output_plugin_params={
'title': '设备信息'})
输出结果:
可以看到,自定义了一个输出模块,可以直接调用,而不需要写脚本。
output_plugin_params 中的所有参数都会传递到 OutputPluginAbstract 类中的 output_params 变量中,可以直接使用。
手动添加设备
有时候我们需要手动添加设备,比如我们需要添加一个设备, 但是设备没有文件日志,我们可以手动添加,并且指定设备的厂家,这样就可以使用对应厂家的分析模块。
from net_inspect import NetInspect, InputPluginResult, vendor
net = NetInspect()
d = InputPluginResult() # 创建一个InputPluginResult对象
d.add_cmd('display clock', "2021-03-19 10:23:08+08:00") # 添加命令和结果
d.hostname = 'Device' # 指定设备名称
d.vendor = vendor.Huawei # 指定设备厂家平台
net.add_device(d) # 添加设备到 NetInspect 中
net.run()
for device in net.cluster.devices:
print(device.info.hostname)
print(device.parse_result('dis clo'))
output:
Device
[{'time': '10:23:08', 'timezone': '', 'dayweek': '', 'year': '2021', 'month': '03', 'day': '19'}]