python+appium自动化测试-Appium并发测试

来自APP Android端自动化测试初学者的笔记,写的不对的地方大家多多指教哦

测试环境

并发启动2个appium服务,再并发启动2台设备测试微博APP(V10.12.0)

2个appium服务,端口配置如下:

  • Appium服务器端口:4723,bp端口为4724
  • Appium服务器端口:4725,bp端口为4726

2台设备:

  • U4AIUKFAL7W4MJLR
  • U4AIUKFAL7W4MHUHUDS

测试app:微博APP(V10.12.0)Andriod版

这部分的代码需要跟前面三篇文章:多设备启动;python启动appium服务;检测端口占用和释放端口整合在一起,先检测端口是否占用,再启动appium服务,最后分配设备启动app。

# Appium并发测试(多进程并发启动appium服务,多进程并发启动设备,检测和释放端口)
import multiprocessing
import time
from appium import webdriver
from appium_sync.check_port_use import CheckPortUse
from appium_sync.check_port_release import CheckPortRelease
from appium_sync.multi_appium import MultiAppium
from appium_sync.multi_devices import MultiDevices

class AppiumDevices:
    driver: webdriver = None
    devices_list = ['U4AIUKFAL7W4MJLR', 'U4AIUKFAL7W4MHUHUDS']

    """初始化类"""
    def __init__(self):
        self.check_port_use = CheckPortUse()
        self.check_port_release = CheckPortRelease()
        self.multi_appium = MultiAppium()
        self.multi_devices = MultiDevices()

    '''检测端口是否被占用,如果没有被占用则启动appium服务,被占用则释放端口'''
    def start_appium_action(self, host, port):
        if self.check_port_use.use_port(host, port):
            time.sleep(0.5)
            self.multi_appium.appium_start(host, port)
            return True
        else:
            self.check_port_release.release_port(port)
            self.start_appium_action(host, port)

    '''先检测appium服务端口是否被占用,占用则启动APP,没有占用则启动appium服务'''
    def start_devices_action(self, udid, port):
        host = '127.0.0.1'
        if self.check_port_use.use_port(host, port) is not True:
            self.multi_devices.appium_desire(udid, port)
        else:
            self.multi_appium.appium_start(host, port)

    '''并发启动appium服务'''
    def appium_start_sync(self):
        print('====appium_start_sync=====')
        # 构建appium进程组
        appium_process = []

        # 加载appium进程
        for i in range(len(AppiumDevices.devices_list)):
            host = '127.0.0.1'
            port = 4723 + 2 * i
            appium = multiprocessing.Process(target=self.start_appium_action, args=(host, port))
            appium_process.append(appium)

        # 启动appium服务
        for appium in appium_process:
            appium.start()
        for appium in appium_process:
            appium.join()
        time.sleep(5)

    '''并发启动设备'''
    def devices_start_sync(self):
        print('===devices_start_sync===')
        # 定义desired进程组
        desired_process = []

        # 加载desired进程
        for i in range(len(AppiumDevices.devices_list)):
            port = 4723 + 2 * i
            desired = multiprocessing.Process(target=self.start_devices_action,
                                              args=(AppiumDevices.devices_list[i], port))
            desired_process.append(desired)

        # 并发启动App
        for desired in desired_process:
            desired.start()
        for desired in desired_process:
            desired.join()

if __name__ == '__main__':
    appium_devices = AppiumDevices()
    appium_devices.appium_start_sync()
    appium_devices.devices_start_sync()
    appium_devices.check_port_release.release_port('4723')
    appium_devices.check_port_release.release_port('4725')
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容