本文主要讲述官方提供的客户端以及自己写的增删查改工具:
ros_tool.py 功能总汇,展示界面用了python的GUL,模块用的为 tkinter 模块
其他文件为单个功能文件。
链接:https://pan.baidu.com/s/1_NIjG6gCQcnbp9Vwfi9Jyw 密码:98ex
运行方式:python3 ros_tool.py ip username password (LINUX下,装过ros,改为自己的 ip,用户名,密码 直接运行即可)
ros为python以及其他语言提供了接口,官方给出了一个 TCP客户端的例子:
# !/usr/bin/env python3
# -*- coding:utf-8 -*-
import sys, time, binascii, socket, select
import hashlib
class ApiRos:
"Routeros api"
def __init__(self, sk):
self.sk = sk
self.currenttag = 0
def login(self, username, pwd):
for repl, attrs in self.talk(["/login"]):
chal = binascii.unhexlify((attrs['=ret']).encode('UTF-8'))
md = hashlib.md5()
md.update(b'\x00')
md.update(pwd.encode('UTF-8'))
md.update(chal)
self.talk(["/login", "=name=" + username,
"=response=00" + binascii.hexlify(md.digest()).decode('UTF-8')])
def talk(self, words):
if self.writeSentence(words) == 0: return
r = []
while 1:
i = self.readSentence();
if len(i) == 0: continue
reply = i[0]
attrs = {}
for w in i[1:]:
j = w.find('=', 1)
if (j == -1):
attrs[w] = ''
else:
attrs[w[:j]] = w[j + 1:]
r.append((reply, attrs))
if reply == '!done': return r
def writeSentence(self, words):
ret = 0
for w in words:
self.writeWord(w)
ret += 1
self.writeWord('')
return ret
def readSentence(self):
r = []
while 1:
w = self.readWord()
if w == '': return r
r.append(w)
def writeWord(self, w):
print(("<<< " + w))
self.writeLen(len(w))
self.writeStr(w)
def readWord(self):
ret = self.readStr(self.readLen())
print((">>> " + ret))
return ret
def writeLen(self, l):
if l < 0x80:
self.writeStr(chr(l))
elif l < 0x4000:
l |= 0x8000
self.writeStr(chr((l >> 8) & 0xFF))
self.writeStr(chr(l & 0xFF))
elif l < 0x200000:
l |= 0xC00000
self.writeStr(chr((l >> 16) & 0xFF))
self.writeStr(chr((l >> 8) & 0xFF))
self.writeStr(chr(l & 0xFF))
elif l < 0x10000000:
l |= 0xE0000000
self.writeStr(chr((l >> 24) & 0xFF))
self.writeStr(chr((l >> 16) & 0xFF))
self.writeStr(chr((l >> 8) & 0xFF))
self.writeStr(chr(l & 0xFF))
else:
self.writeStr(chr(0xF0))
self.writeStr(chr((l >> 24) & 0xFF))
self.writeStr(chr((l >> 16) & 0xFF))
self.writeStr(chr((l >> 8) & 0xFF))
self.writeStr(chr(l & 0xFF))
def readLen(self):
c = ord(self.readStr(1))
if (c & 0x80) == 0x00:
pass
elif (c & 0xC0) == 0x80:
c &= ~0xC0
c <<= 8
c += ord(self.readStr(1))
elif (c & 0xE0) == 0xC0:
c &= ~0xE0
c <<= 8
c += ord(self.readStr(1))
c <<= 8
c += ord(self.readStr(1))
elif (c & 0xF0) == 0xE0:
c &= ~0xF0
c <<= 8
c += ord(self.readStr(1))
c <<= 8
c += ord(self.readStr(1))
c <<= 8
c += ord(self.readStr(1))
elif (c & 0xF8) == 0xF0:
c = ord(self.readStr(1))
c <<= 8
c += ord(self.readStr(1))
c <<= 8
c += ord(self.readStr(1))
c <<= 8
c += ord(self.readStr(1))
return c
def writeStr(self, str):
n = 0;
while n < len(str):
r = self.sk.send(bytes(str[n:], 'UTF-8'))
if r == 0: raise RuntimeError("connection closed by remote end")
n += r
def readStr(self, length):
ret = ''
while len(ret) < length:
s = self.sk.recv(length - len(ret))
if s == '': raise RuntimeError("connection closed by remote end")
ret += s.decode('UTF-8', 'replace')
return ret
def main():
s = None
for res in socket.getaddrinfo(sys.argv[1], "8728", socket.AF_UNSPEC, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
try:
s = socket.socket(af, socktype, proto)
except (socket.error, msg):
s = None
continue
try:
s.connect(sa)
except (socket.error, msg):
s.close()
s = None
continue
break
if s is None:
print('could not open socket')
sys.exit(1)
apiros = ApiRos(s);
apiros.login(sys.argv[2], sys.argv[3]);
#模拟用户自己输入代码进行api控制
tmpcommand=input('请输入你想查询的命令')
inputsentence = []
inputsentence.append(tmpcommand)
apiros.writeSentence(inputsentence)
while 1:
x = apiros.readSentence()
#print(x)
if x == ['!done'] or x==['!re', '=status=finished']:
break
#导出ros配置到ROS本地ghg1.rsc文件
# inputsentence = []
#
# inputsentence.append('/export')
# inputsentence.append('=file=ghg1.rsc')
# apiros.writeSentence(inputsentence)
# while 1:
# x = apiros.readSentence()
# #print(x)
# if x == ['!done'] or x==['!re', '=status=finished']:
# break
#从ros上传文件到ftp的代码
# inputsentence = []
# inputsentence.append('/tool/fetch')
# inputsentence.append('=address=192.168.0.108')
# inputsentence.append('=src-path=ghg1.rsc')
# inputsentence.append('=user=xxxxx')
# inputsentence.append('=mode=ftp')
# inputsentence.append('=password=xxxxx')
# inputsentence.append('=dst-path=123.rsc')
# inputsentence.append('=upload=yes')
# apiros.writeSentence(inputsentence)
# inputsentence = []
# while 1:
# x = apiros.readSentence()
# #print(x)
# if x == ['!done'] or x==['!re', '=status=finished']:
# break
#删除文件代码
# inputsentence = []
# inputsentence.append('/file/remove')
# inputsentence.append('=numbers=ghg1.rsc')
# apiros.writeSentence(inputsentence)
# while 1:
# x = apiros.readSentence()
# print(x)
# if x == ['!done'] or x==['!re', '=status=finished']:
# break
#官方循环代码,等待你输入命令行,可以用来测试代码命令行
# while 1:
# r = select.select([s, sys.stdin], [], [], None)
# if s in r[0]:
# # something to read in socket, read sentence
# x = apiros.readSentence()
#
# if sys.stdin in r[0]:
# # read line from input and strip off newline
# l = sys.stdin.readline()
# print(l)
# l = l[:-1]
# print(l)
#
#
# # if empty line, send sentence and start with new
# # otherwise append to input sentence
# if l == '':
# apiros.writeSentence(inputsentence)
# inputsentence = []
# else:
# inputsentence.append(l)
if __name__ == '__main__':
main()
下面是本人写的一个小工具,可以进行路由的增删改查,代码有很多不足之处,删除功能时 容易出现问题,做了死循环,根据时间控制结束,性能非常低,有兴趣的可以改下:
运行方式:python3 ros_tool.py ip username password (LINUX下,装过ros,改为自己的 ip,用户名,密码 直接运行即可)
# !/usr/bin/env python
# -*- coding:utf-8 -*-
# Author:yhq
import sys, time, binascii, socket, select
from datetimeimport datetime
# import posix,
import hashlib
from tkinterimport *
# import subprocess
root = Tk()
root.title("ros工具")
root.geometry('1100x800')
class ApiRos:
"Routeros api"
def __init__(self, sk):
"""
初始化函数
:paramsk:
"""
self.sk = sk
self.currenttag =0
def login(self, username, pwd):
"""
登录函数
:paramusername:
:parampwd:
:return:
"""
for repl, attrsin self.talk(["/login"]):
chal = binascii.unhexlify((attrs['=ret']).encode('UTF-8'))
# 对 pwd 进行加密
md = hashlib.md5()
md.update(b'\x00')
md.update(pwd.encode('UTF-8'))
md.update(chal)
self.talk(["/login", "=name=" + username,
"=response=00" + binascii.hexlify(md.digest()).decode('UTF-8')])
def talk(self, words):
"""
:paramwords:
:return:
"""
if self.writeSentence(words) ==0:
return
r = []
while 1:
# python2 中 while 1 比 while True 执行速度快 大概快1/3 python3中两者基本没区别
i =self.readSentence()
if len(i) ==0:
continue
reply = i[0]
attrs = {}
for win i[1:]:
j = w.find('=', 1)
if j == -1:
attrs[w] =''
else:
attrs[w[:j]] = w[j +1:]
r.append((reply, attrs))
if reply =='!done':
return r
def writeSentence(self, words):
"""
:paramwords:
:return:
"""
ret =0
for win words:
self.writeWord(w)
ret +=1
self.writeWord('')
return ret
def readSentence(self):
"""
:return:
"""
r = []
while 1:
w =self.readWord()
if w =='':
return r
r.append(w)
def writeWord(self, w):
"""
:paramw:
:return:
"""
print(("<<< " + w))
self.writeLen(len(w))
self.writeStr(w)
def readWord(self):
"""
:return:
"""
ret =self.readStr(self.readLen())
print((">>> " + ret))
return ret
def writeLen(self, l):
"""
:paraml:
:return:
"""
if l <0x80:
self.writeStr(chr(l))
elif l <0x4000:
l |=0x8000
self.writeStr(chr((l >>8) &0xFF))
self.writeStr(chr(l &0xFF))
elif l <0x200000:
l |=0xC00000
self.writeStr(chr((l >>16) &0xFF))
self.writeStr(chr((l >>8) &0xFF))
self.writeStr(chr(l &0xFF))
elif l <0x10000000:
l |=0xE0000000
self.writeStr(chr((l >>24) &0xFF))
self.writeStr(chr((l >>16) &0xFF))
self.writeStr(chr((l >>8) &0xFF))
self.writeStr(chr(l &0xFF))
else:
self.writeStr(chr(0xF0))
self.writeStr(chr((l >>24) &0xFF))
self.writeStr(chr((l >>16) &0xFF))
self.writeStr(chr((l >>8) &0xFF))
self.writeStr(chr(l &0xFF))
def readLen(self):
"""
:return:
"""
c =ord(self.readStr(1))
if (c &0x80) ==0x00:
pass
elif (c &0xC0) ==0x80:
c &= ~0xC0
c <<=8
c +=ord(self.readStr(1))
elif (c &0xE0) ==0xC0:
c &= ~0xE0
c <<=8
c +=ord(self.readStr(1))
c <<=8
c +=ord(self.readStr(1))
elif (c &0xF0) ==0xE0:
c &= ~0xF0
c <<=8
c +=ord(self.readStr(1))
c <<=8
c +=ord(self.readStr(1))
c <<=8
c +=ord(self.readStr(1))
elif (c &0xF8) ==0xF0:
c =ord(self.readStr(1))
c <<=8
c +=ord(self.readStr(1))
c <<=8
c +=ord(self.readStr(1))
c <<=8
c +=ord(self.readStr(1))
return c
def writeStr(self, str):
"""
:paramstr:
:return:
"""
n =0
while n
r =self.sk.send(bytes(str[n:], 'UTF-8'))
if r ==0:
raise RuntimeError("connection closed by remote end")
n += r
def readStr(self, length):
"""
:paramlength:
:return:
"""
ret =''
while len(ret) < length:
s =self.sk.recv(length -len(ret))
if s =='':
raise RuntimeError("connection closed by remote end")
ret += s.decode('UTF-8', 'replace')
return ret
def check_interface():
"""
查看interface
:return:
"""
s =None
for resin socket.getaddrinfo(sys.argv[1], "8728", socket.AF_UNSPEC, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
try:
s = socket.socket(af, socktype, proto)
except socket.error:
s =None
continue
try:
s.connect(sa)
except socket.error:
s.close()
s =None
continue
break
if sis None:
print('could not open socket')
sys.exit(1)
global apiros
apiros = ApiRos(s)
apiros.login(sys.argv[2], sys.argv[3])
ls = ['/interface/print', '=.proplist=name']
apiros.writeSentence(ls)
ls1 = []
while 1:
x = apiros.readSentence()
if len(x) >1:
ls1.append(x[1][6:])
if x == ['!done']or x == ['!re', '=status=finished']:
break
# print('可用interface名字:%s' % ls1)
te.insert('1.0', " 可用interface名字: %s\n" % ls1)
te.insert('1.0', "*******************************\n")
l1 = Label(root, text="输入interface名:")
l1.pack()# 这里的side可以赋值为LEFT RIGHT TOP BOTTOM
global xls
xls = Entry()
xls.pack()
l2 = Label(root, text="输入ip地址段前三位(如192.168.3):")
l2.pack()# 这里的side可以赋值为LEFT RTGHT TOP BOTTOM
global sheet
sheet = Entry()
sheet.pack()
def add_ip():
n =0
t1 = time.time()
interface = xls.get()
ip = sheet.get()
for iin range(0, 25):
address ='%s.%s/24' % (ip, i)
network ='%s.0' % ip
inputsentence = ['/ip/address/add', '=address=%s' % address, '=interface=%s' % interface,
'=network=%s' % network, '=comment=%s %s' % (address, interface)]
apiros.writeSentence(inputsentence)
x = apiros.readSentence()
# print("添加address:%s成功" % address)
te.insert('1.0', "添加ip:%s成功\n" % address)
n +=1
t2 = time.time()
t = t2 - t1
# print('添加ip数量:%s' % n)
te.insert('1.0', "添加ip数量:%s\n" % n)
print('添加ip用时:%s' % t)
te.insert('1.0', "添加ip用时:%s\n" % t)
te.insert('1.0', "*******************************\n")
def remove_ip():
"""
批量删除ip
:return:
"""
s =None
for resin socket.getaddrinfo(sys.argv[1], "8728", socket.AF_UNSPEC, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
try:
s = socket.socket(af, socktype, proto)
except socket.error:
s =None
continue
try:
s.connect(sa)
except socket.error:
s.close()
s =None
continue
break
if sis None:
print('could not open socket')
sys.exit(1)
apiros = ApiRos(s)
apiros.login(sys.argv[2], sys.argv[3])
# 批量删除ip
global count
count = -1
global n
n =0
inputsentence = ['/ip/address/print', '=.proplist=.id,comment']
apiros.writeSentence(inputsentence)
while 1:
x = apiros.readSentence()
count +=1
if len(x) >2:
# x[1]为ip 的 id
while 1:
inputsentence = ['/ip/address/remove', x[1]]
apiros.writeSentence(inputsentence)
y = apiros.readSentence()
n +=1
if y == ['!done']or y == ['!re', '=status=finished']:
break
# print("删除%s成功" % x[2])
te.insert('1.0', "删除ip:%s成功\n" % x[2])
else:
pass
if x == ['!done']or x == ['!re', '=status=finished']:
break
def start_remove_ip():
"""
执行remove_ip
:return:
"""
t1 = time.time()
while 1:
remove_ip()
if n ==0:
break
t2 = time.time()
t = t2 - t1
# print('剩余ip总数:%s' % count)
te.insert('1.0', "剩余ip总数:%s成功\n" % count)
# print('删除ip用时:%s' % t)
te.insert('1.0', "删除ip用时:%s\n" % t)
te.insert('1.0', "*******************************\n")
def backup():
"""
备份设备
:return:
"""
s =None
for resin socket.getaddrinfo(sys.argv[1], "8728", socket.AF_UNSPEC, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
try:
s = socket.socket(af, socktype, proto)
except socket.error:
s =None
continue
try:
s.connect(sa)
except socket.error:
s.close()
s =None
continue
break
if sis None:
print('could not open socket')
sys.exit(1)
apiros = ApiRos(s)
apiros.login(sys.argv[2], sys.argv[3])
inputsentence = []
inputsentence.append('/system/backup/save')
# 按日期生成文件名
filename = datetime.now().strftime('%Y%m%d %H%M%S')
inputsentence.append(' =name=%s.backup' % filename)
apiros.writeSentence(inputsentence)
while 1:
x = apiros.readSentence()
print(x)
if x == ['!done']or x == ['!re', '=status=finished']:
# print("备份成功")
te.insert('1.0', "备份:%s.backup成功\n" % filename)
break
te.insert('1.0', "*******************************\n")
def check_ip():
"""
查看ip
:return:
"""
s =None
for resin socket.getaddrinfo(sys.argv[1], "8728", socket.AF_UNSPEC, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
try:
s = socket.socket(af, socktype, proto)
except socket.error:
s =None
continue
try:
s.connect(sa)
except socket.error:
s.close()
s =None
continue
break
if sis None:
print('could not open socket')
sys.exit(1)
apiros = ApiRos(s)
apiros.login(sys.argv[2], sys.argv[3])
# 查看ip
inputsentence = ['/ip/address/print', ]
apiros.writeSentence(inputsentence)
count1 = -1
while 1:
x = apiros.readSentence()
count1 +=1
if x == ['!done']or x == ['!re', '=status=finished']:
break
print('ip总数:%s' % count1)
te.insert('1.0', "ip总数:%s\n" % count1)
te.insert('1.0', "*******************************\n")
def check_route():
"""
查看route
:return:
"""
s =None
for resin socket.getaddrinfo(sys.argv[1], "8728", socket.AF_UNSPEC, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
try:
s = socket.socket(af, socktype, proto)
except socket.error:
s =None
continue
try:
s.connect(sa)
except socket.error:
s.close()
s =None
continue
break
if sis None:
print('could not open socket')
sys.exit(1)
apiros = ApiRos(s)
apiros.login(sys.argv[2], sys.argv[3])
# 查看route
inputsentence = ['/ip/route/print', ]
apiros.writeSentence(inputsentence)
count2 = -1
while 1:
x = apiros.readSentence()
count2 +=1
if x == ['!done']or x == ['!re', '=status=finished']:
break
# print('route总数:%s' % count2)
te.insert('1.0', "route总数:%s\n" % count2)
te.insert('1.0', "*******************************\n")
def add_route():
"""
批量添加route
:return:
"""
gateway = xls.get()
ip = sheet.get()
for iin range(0, 10):
dst_address ='%s.%s.0/24' % (ip[:ip.rfind('.')], i)
# pref_src = '%s.%s.%s' % (ip, i, i)
inputsentence = ['/ip/route/add', '=dst-address=%s' % dst_address, '=gateway=%s' % gateway,
'=comment=%s ' % dst_address]
apiros.writeSentence(inputsentence)
x = apiros.readSentence()
# print("添加route:%s成功" % dst_address)
te.insert('1.0', "添加route:%s成功\n" % dst_address)
te.insert('1.0', "*******************************\n")
def remove_route():
"""
批量删除route
:return:
"""
s =None
for resin socket.getaddrinfo(sys.argv[1], "8728", socket.AF_UNSPEC, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
try:
s = socket.socket(af, socktype, proto)
except socket.error:
s =None
continue
try:
s.connect(sa)
except socket.error:
s.close()
s =None
continue
break
if sis None:
print('could not open socket')
sys.exit(1)
apiros = ApiRos(s)
apiros.login(sys.argv[2], sys.argv[3])
# 批量删除route
global count
count = -1
global n
n =0
inputsentence = ['/ip/route/print', '?>comment=', '=.proplist=.id,gateway']
apiros.writeSentence(inputsentence)
while 1:
x = apiros.readSentence()
count +=1
if len(x) ==3:
inputsentence = ['/ip/route/remove', x[1]]
apiros.writeSentence(inputsentence)
y = apiros.readSentence()
n +=1
# print ("删除%s成功" % x[2])
te.insert('1.0', "删除%s成功\n" % x[2])
if x == ['!done']or x == ['!re', '=status=finished']:
break
def start_remove_route():
"""
执行 remove_route
:return:
"""
t1 = time.time()
t2 = t1 +15
while 1:
remove_route()
t3 = time.time()
if t3 > t2:
te.insert('1.0', "*******************************\n")
break
def remove_re():
"""
route去重
:return:
"""
s =None
for resin socket.getaddrinfo(sys.argv[1], "8728", socket.AF_UNSPEC, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
try:
s = socket.socket(af, socktype, proto)
except socket.error:
s =None
continue
try:
s.connect(sa)
except socket.error:
s.close()
s =None
continue
break
if sis None:
print('could not open socket')
sys.exit(1)
apiros = ApiRos(s)
apiros.login(sys.argv[2], sys.argv[3])
# 批量删除重复route
global count
count = -1
global n
n =0
inputsentence = ['/ip/route/print', '?=active=false', '=.proplist=.id,gateway']
apiros.writeSentence(inputsentence)
while 1:
x = apiros.readSentence()
count +=1
if len(x) ==3:
inputsentence = ['/ip/route/remove', x[1]]
apiros.writeSentence(inputsentence)
y = apiros.readSentence()
n +=1
# print("删除%s成功" % x[2])
te.insert('1.0', "去重%s成功\n" % x[2])
if x == ['!done']or x == ['!re', '=status=finished']:
break
def start_remove_re():
"""
执行remove_re
:return:
"""
t1 = time.time()
t2 = t1 +15
while 1:
remove_re()
t3 = time.time()
if t3 > t2:
te.insert('1.0', "*******************************\n")
break
te = Text()
te.pack()
Button(root, text="备份设备", bg='green', command=backup).pack(padx=10, pady=10, ipadx=3, side=LEFT)
Button(root, text="查看可用interface", bg='green', command=check_interface).pack(padx=5, pady=20, side=LEFT)
Button(root, text="查看ip", bg='green', command=check_ip).pack(padx=5, pady=10, ipadx=3, side=LEFT)
Button(root, text="查看route", bg='green', command=check_route).pack(padx=5, pady=15, ipadx=3, side=LEFT)
Button(root, text="删除ip", bg='#ff3300', command=start_remove_ip).pack(padx=5, pady=10, ipadx=3, side=RIGHT)
Button(root, text="删除route", bg='#ff3300', command=start_remove_route).pack(padx=5, pady=10, ipadx=3, side=RIGHT)
Button(root, text="route去重", bg='#ff3300', command=start_remove_re).pack(padx=5, pady=10, ipadx=3, side=RIGHT)
Button(root, text="增加ip", bg='#9900ff', command=add_ip).pack(padx=5, pady=10, ipadx=3, side=RIGHT)
Button(root, text="增加route", bg='#9900ff', command=add_route).pack(padx=5, pady=10, ipadx=3, side=RIGHT)
root.mainloop()