来自APP Android端自动化测试初学者的笔记,写的不对的地方大家多多指教哦
测试环境
并发启动2个appium服务,再并发启动2台设备测试微博APP(V10.12.0)
2个appium服务,端口配置如下:
- Appium服务器端口:4723,bp端口为4724
- Appium服务器端口:4725,bp端口为4726
2台设备:
- U4AIUKFAL7W4MJLR
- U4AIUKFAL7W4MHUHUDS
测试app:微博APP(V10.12.0)Andriod版
这部分的代码需要跟前面三篇文章:多设备启动;python启动appium服务;检测端口占用和释放端口整合在一起,先检测端口是否占用,再启动appium服务,最后分配设备启动app。
# Appium并发测试(多进程并发启动appium服务,多进程并发启动设备,检测和释放端口)
import multiprocessing
import time
from appium import webdriver
from appium_sync.check_port_use import CheckPortUse
from appium_sync.check_port_release import CheckPortRelease
from appium_sync.multi_appium import MultiAppium
from appium_sync.multi_devices import MultiDevices
class AppiumDevices:
driver: webdriver = None
devices_list = ['U4AIUKFAL7W4MJLR', 'U4AIUKFAL7W4MHUHUDS']
"""初始化类"""
def __init__(self):
self.check_port_use = CheckPortUse()
self.check_port_release = CheckPortRelease()
self.multi_appium = MultiAppium()
self.multi_devices = MultiDevices()
'''检测端口是否被占用,如果没有被占用则启动appium服务,被占用则释放端口'''
def start_appium_action(self, host, port):
if self.check_port_use.use_port(host, port):
time.sleep(0.5)
self.multi_appium.appium_start(host, port)
return True
else:
self.check_port_release.release_port(port)
self.start_appium_action(host, port)
'''先检测appium服务端口是否被占用,占用则启动APP,没有占用则启动appium服务'''
def start_devices_action(self, udid, port):
host = '127.0.0.1'
if self.check_port_use.use_port(host, port) is not True:
self.multi_devices.appium_desire(udid, port)
else:
self.multi_appium.appium_start(host, port)
'''并发启动appium服务'''
def appium_start_sync(self):
print('====appium_start_sync=====')
# 构建appium进程组
appium_process = []
# 加载appium进程
for i in range(len(AppiumDevices.devices_list)):
host = '127.0.0.1'
port = 4723 + 2 * i
appium = multiprocessing.Process(target=self.start_appium_action, args=(host, port))
appium_process.append(appium)
# 启动appium服务
for appium in appium_process:
appium.start()
for appium in appium_process:
appium.join()
time.sleep(5)
'''并发启动设备'''
def devices_start_sync(self):
print('===devices_start_sync===')
# 定义desired进程组
desired_process = []
# 加载desired进程
for i in range(len(AppiumDevices.devices_list)):
port = 4723 + 2 * i
desired = multiprocessing.Process(target=self.start_devices_action,
args=(AppiumDevices.devices_list[i], port))
desired_process.append(desired)
# 并发启动App
for desired in desired_process:
desired.start()
for desired in desired_process:
desired.join()
if __name__ == '__main__':
appium_devices = AppiumDevices()
appium_devices.appium_start_sync()
appium_devices.devices_start_sync()
appium_devices.check_port_release.release_port('4723')
appium_devices.check_port_release.release_port('4725')