ELK通过SNMP免编程实现IP/MAC地址资产监控
大体思路
logstash input有SNMP插件,利用logstash定时从网络设备中获取IP/MAC地址信息,存入elasticsearch,然后利用kibana实现基本查询和可视化。
每个间隔周期获取全部IP/MAC地址信息。
流程方法
与ARP表类似的SNMP信息的位置在:ipNetToMediaTable(.1.3.6.1.2.1.4.22)( IP-MIB)
与MAC地址表相似的SNMP信息位置在:dot1dTpFdbTable(.1.3.6.1.2.1.17.4.3)( BRIDGE-MIB)
这两个table直接抓就可以获得全部IP、MAC地址信息了,但是对应的port不是直接端口描述,为了日后方便使用就需要准备好端口IP到端口描述的转换。
dot1dTpFdbTable(.1.3.6.1.2.1.17.4.3):
dot1dTpFdbPort(.1.3.6.1.2.1.17.4.3.1.2):对应的是dot1dBasePortTable(.1.3.6.1.2.1.17.1.4)下面的dot1dBasePort(.1.3.6.1.2.1.17.1.4.1.1)
dot1dBasePortTable表中有dot1dBasePortIfIndex(.1.3.6.1.2.1.17.1.4.1.2),对应的是ifTable(.1.3.6.1.2.1.2.2)下面的ifIndex(.1.3.6.1.2.1.2.2.1.1),ifTable下有端口描述ifDescr(.1.3.6.1.2.1.2.2.1.2)
ipNetToMediaTable(.1.3.6.1.2.1.4.22):
ipNetToMediaIfIndex(.1.3.6.1.2.1.4.22.1.1)对应的是ifTable(.1.3.6.1.2.1.2.2)下面的ifIndex(.1.3.6.1.2.1.2.2.1.1),ifTable下有端口描述ifDescr(.1.3.6.1.2.1.2.2.1.2)

logstash配置
这是获取ifindex表的logstash配置:短时间内运行一次相当于初始化就可以了
input {
snmp {
hosts => [{host => "udp:192.168.0.249/161" community => "public"},
{host => "udp:192.168.20.253/161" community => "public"},
{host => "udp:192.168.12.253/161" community => "public"},
{host => "udp:192.168.0.241/161" community => "public"}
]
tables => [{"name"=> "iftable" "columns"=> ["1.3.6.1.2.1.2.2.1.1","1.3.6.1.2.1.2.2.1.2"]}]
}
#上面直接取回来的每个主机是一条信息,为了方便以后处理利用split分割成多条信息
}
filter {
split { field => "iftable" }
mutate {
rename => { "[iftable][iso.org.dod.internet.mgmt.mib-2.interfaces.ifTable.ifEntry.ifIndex]" => "ifIndex"}
rename => { "[iftable][iso.org.dod.internet.mgmt.mib-2.interfaces.ifTable.ifEntry.ifDescr]" => "ifDescr"}
remove_field => ["iftable"]
add_field => {"cmdbtype" => "iftable"}
}
}
output {
elasticsearch{
hosts=> ["192.168.6.44:9200"]
index=> "nhserear-snmpiftable-%{+YYYY.MM.dd}"
}
}
这是获得fdbbaseport表的logstash配置:短时间内运行一次相当于初始化就可以了
input {
snmp {
hosts => [{host => "udp:192.168.0.249/161" community => "public"},
{host => "udp:192.168.20.253/161" community => "public"},
{host => "udp:192.168.12.253/161" community => "public"},
{host => "udp:192.168.0.241/161" community => "public"}
]
tables => [{"name"=> "fdbtable" "columns"=> ["1.3.6.1.2.1.17.1.4.1.1","1.3.6.1.2.1.17.1.4.1.2"]}]
}
}
filter {
split { field => "fdbtable" }
mutate {
rename => { "[fdbtable][iso.org.dod.internet.mgmt.mib-2.dot1dBridge.dot1dBase.dot1dBasePortTable.dot1dBasePortEntry.dot1dBasePort]" => "fdbport"}
rename => { "[fdbtable][iso.org.dod.internet.mgmt.mib-2.dot1dBridge.dot1dBase.dot1dBasePortTable.dot1dBasePortEntry.dot1dBasePortIfIndex]" => "ifIndex"}
remove_field => ["fdbtable"]
add_field => {"cmdbtype" => "fdbtable"}
}
elasticsearch {
hosts =>["192.168.6.44:9200"]
index => "nhserear-snmpiftable-2021.01.20"
query =>"ifIndex:%{[ifIndex]} AND host:%{[host]}"
fields => { "ifDescr" => "ifDescr" }
}
}
output {
elasticsearch{
hosts=> ["192.168.6.44:9200"]
index=> "nhserear-snmpfdbtable-%{+YYYY.MM.dd}"
}
}
注意filter elasticsearch这个插件中有几个缺省配置:
sort:
查询结果排序的方式,缺省是"@timestamp:desc",按时间戳 倒序。
result_size:
查询结果返回的个数,缺省值是1.
这两个参数组合后的结果就是,每次elasticsearch查询的是最新的一个结果。
下面是正常运行时logstash的配置:
input {
snmp {
interval => 60
hosts => [{host => "udp:192.168.0.249/161" community => "public"},
{host => "udp:192.168.20.253/161" community => "public"},
{host => "udp:192.168.12.253/161" community => "public"},
{host => "udp:192.168.0.241/161" community => "public"}
]
tables => [{"name"=> "mac-address" "columns"=> ["1.3.6.1.2.1.17.4.3.1.1","1.3.6.1.2.1.17.4.3.1.2"] },
{"name"=> "arp-address" "columns"=>["1.3.6.1.2.1.4.22.1.1","1.3.6.1.2.1.4.22.1.2","1.3.6.1.2.1.4.22.1.3"]}]
}
}
filter {
clone {
clones => [event]
add_field => { "clone" => "true" }
}
if [clone] { mutate {remove_field => ["mac-address"] }}
else { mutate { remove_field => ["arp-address"]}}
if [mac-address] {
split { field => "mac-address" }
mutate {
rename => { "[mac-address][iso.org.dod.internet.mgmt.mib-2.dot1dBridge.dot1dTp.dot1dTpFdbTable.dot1dTpFdbEntry.dot1dTpFdbAddress]" => "MACaddress"}
rename => { "[mac-address][iso.org.dod.internet.mgmt.mib-2.dot1dBridge.dot1dTp.dot1dTpFdbTable.dot1dTpFdbEntry.dot1dTpFdbPort]" => "FDBPort"}
remove_field => ["mac-address"]
add_field => {"cmdbtype" => "MACtable"}
}
elasticsearch {
hosts =>["192.168.6.44:9200"]
index => "nhserear-snmpfdbtable-2021.01.20"
query =>"fdbport:%{[FDBPort]} AND host:%{[host]}"
fields => { "ifDescr" => "ifDescr" }
}
}
if [arp-address] {
split { field => "arp-address" }
mutate {
rename => { "[arp-address][iso.org.dod.internet.mgmt.mib-2.ip.ipNetToMediaTable.ipNetToMediaEntry.ipNetToMediaIfIndex]" => "ifIndex"}
rename => { "[arp-address][iso.org.dod.internet.mgmt.mib-2.ip.ipNetToMediaTable.ipNetToMediaEntry.ipNetToMediaNetAddress]" => "IPaddress"}
rename => { "[arp-address][iso.org.dod.internet.mgmt.mib-2.ip.ipNetToMediaTable.ipNetToMediaEntry.ipNetToMediaPhysAddress]" => "MACaddress"}
remove_field => ["arp-address"]
add_field => {"cmdbtype" => "ARPtable"}
}
elasticsearch {
hosts =>["192.168.6.44:9200"]
index => "nhserear-snmpiftable-2021.01.20"
query =>"ifIndex:%{[ifIndex]} AND host:%{[host]}"
fields => { "ifDescr" => "ifDescr" }
}
}
}
output {
elasticsearch{
hosts=> ["192.168.6.44:9200"]
index=> "nhserear-snmp-%{+YYYY.MM.dd}"
}
}
简单做了一个可视化效果:

*注:还是有些功能在免代码情况无法实现,后续可能通过定制前端实现
存在的问题
目前发现至少是华为华三的二层交换机和三层交换机,已经和传统我们对交换机MAC地址、ARP表的理解不太一样了,传统上我认为只要有二层以太帧流入交换机,数据帧头部的源MAC地址就会自动被二层交换机学习到,这样以来,以后再有发往该MAC地址的数据帧交换机就知道往哪里交换了,这是以前交换机原理里面的知识。如果按照上面的说明,二层交换机三层交换机中MAC地址表应该是全的,也就是所有流经该设备的源MAC地址都会被记入MAC地址表,但最近发现华为、华三的交换机上如果存在ARP表项的MAC地址,就不会在出现在MAC地址表中了,不知道这是不是现在交换设备的新功能。
通过SNMP获取的ipNetToMediaTable(.1.3.6.1.2.1.4.22)和通过命令行直接在交换机上看到的ARP表是有区别的,三层交换机ARP表中能够显示IP、MAC地址对应的具体物理端口号,ipNetToMediaTable只能告诉你该IP、MAC对应的三层接口的IFINDEX,也就说是三层接口号。
上面两个小问题最后导致一个问题是,在交换机上通过ARP表可以很容易查找到IP、MAC所在的具体物理端口,而通过SNMP方法,如果查IP,可以告诉你在那个三层接口上,例如vlan interface,查MAC地址,如果这个MAC有ARP记录,则MAC地址表中就不会记录,还是只能定位到三层接口,这个MAC地址没有ARP记录,只有这种情况才可能通过MAC地址表获得具体物理接口位置。
查了很多资料,包括H3C私有的MIB库,目前没有找到能够通过SNMP准确获取ARP表的方法。
后来发现并不是交换机MAC地址、arp表发生了变化,而是做实验的这套设备,三层交换机并不是再VLAN接口上配置的IP地址,实现VLAN间路由,而是把物理接口直接改为路由接口(已经不是交换接口,部分三层交换机支持物理路由接口),路由接口直连二层交换机,在这种情况下,三层交换机上是看不到MAC地址表,因为MAC地址表是为了二层交换而建立的,三层路由接口并无直接二层交换的能力。