2021-05-06

简介

https://www.jianshu.com/p/30cbbc000d67

https://blog.csdn.net/BearStarX/article/details/91791898

https://debugtalk.com/post/head-first-locust-advanced-script/

https://cloud.tencent.com/developer/article/1594240 (Locust官方文档(API)解读(全)

Locust是一款易于使用的分布式用户负载测试工具。它用于对网站(或其他系统)进行负载测试,并确定系统可以处理多少并发用户。

Locust完全基于事件,因此可以在一台计算机上支持数千个并发用户。与许多其他基于事件的应用程序相比,它不使用回调。相反,它通过协程(gevent)机制使用轻量级过程。每个蝗虫蜂拥到你的网站实际上是在自己的进程内运行(或者是greenlet)。

Locust的并发机制摒弃了进程和线程,采用协程(gevent)的机制。采用多线程来模拟多用户时,线程数会随着并发数的增加而增加,而线程之间的切换是需要占用资源的,IO的阻塞和线程的sleep会不可避免的导致并发效率下降;正因如此,LoadRunner和Jmeter这类采用进程和线程的测试工具,都很难在单机上模拟出较高的并发压力。而协程和线程的区别在于,协程避免了系统级资源调度,由此大幅提高了性能。正常情况下,单台普通配置的测试机可以生产数千并发压力,这是LoadRunner和Jmeter都无法实现的。

注意:locust 在wins 上的性能比较差 ,所以测试时候 ,压力机最好是Linux的

1. 安装

在装好python的机器上 使用 pip install locust 安装

2. Locust 入门使用

Locust 主要使用的类有 2个 : 1)Taskset 2)HttpUser(http请求使用)。 locust 是基于requests库的,所以使用起来比较简单,由于基于requests 所以请求可以保留session ,自动保留登录态,token也可以脚本上保存。 但是requests的并发能力不行 ,官方建议大并发时使用 FastHttpLocust做为客户端

例子:

from locust import HttpLocust, TaskSet, task class WebsiteTasks(TaskSet): def on_start(self): self.client.post("/login", { "username": "test", "password": "123456" }) @task(2) def index(self): self.client.get("/") @task(1) def about(self): self.client.get("/about/") class WebsiteUser(HttpLocust): task_set = WebsiteTasks host = "http://debugtalk.com" min_wait = 1000 max_wait = 5000

class HttpLocust(Locust)

client : 对应着虚拟用户作为客户端所具备的请求能力,也就是请求发起器,基于requests

task_set: 指向一个TaskSet类,TaskSet类定义了用户的任务信息,该属性为必填;

max_wait/min_wait: 每个用户执行两个任务间隔时间的上下限(毫秒),具体数值在上下限中随机取值,若不指定则默认间隔时间固定为1秒;

host:被测系统的host,当在终端中启动locust时没有指定--host参数时才会用到;

weight:同时运行多个Locust类时会用到,用于控制不同类型任务的执行权重。

class TaskSet

性能测试工具要模拟用户的业务操作,就需要通过脚本模拟用户的行为。在前面的比喻中说到,TaskSet类好比蝗虫的大脑,控制着蝗虫的具体行为。

TaskSet类实现了虚拟用户所执行任务的调度算法,包括规划任务执行顺序(schedule_task)、挑选下一个任务(execute_next_task)、执行任务(execute_task)、休眠等待(wait)、中断控制(interrupt)等等。在此基础上,就可以在TaskSet子类中采用非常简洁的方式来描述虚拟用户的业务测试场景,对虚拟用户的所有行为(任务)进行组织和描述,并可以对不同任务的权重进行配置。

在TaskSet子类中定义任务信息时,可以采取两种方式,@task装饰器和tasks属性。

from locust import TaskSet, task class UserBehavior(TaskSet): @task(1) def test_job1(self): self.client.get('/job1') @task(2) def test_job2(self): self.client.get('/job2')

from locust import TaskSet def test_job1(obj): obj.client.get('/job1') def test_job2(obj): obj.client.get('/job2') class UserBehavior(TaskSet): tasks = {test_job1:1, test_job2:2} # tasks = [(test_job1,1), (test_job1,2)] # 两种方式等价

on_start 和 on_stop 方法

User(和 TaskSet)可以声明 on _ start 方法、 on _ stop 方法。用户在开始运行时调用 on _ start 方法,在停止运行时调用 on _ stop 方法。对于 TaskSet,当模拟用户开始执行 TaskSet 时调用 on _ start 方法,当模拟用户停止执行 TaskSet 时调用 on _ stop。 类似setUp 和 teardown方法,在初始化的时候只执行一次。

  • 关联
  • 参数化 (可使用python 的list 和quere实现)
  • 检查点
  • 集合点

具体做法是,在WebsiteUser定义一个数据集,然后所有虚拟用户在WebsiteTasks中就可以共享该数据集了。如果不要求数据唯一性,数据集选择list数据结构,从头到尾循环遍历即可;如果要求数据唯一性,数据集选择queue数据结构,取数据时进行queue.get()操作即可,并且这也不会循环取数据;至于涉及到需要循环取数据的情况,那也简单,每次取完数据后再将数据插入到队尾即可,queue.put_nowait(data)。

如下为一个例子:

from locust import TaskSet, task, HttpLocust import queue #保证并发测试数据唯一性,循环取数据:模拟3用户并发登录账号,总共有90个账号,要求并发登录账号不相同,但数据可循环使用。 class UserBehavior(TaskSet): @task def test_register(self): try: data = self.locust.user_data_queue.get() except queue.Empty: print('account data run out, test ended.') exit(0) print('register with user: {}, pwd: {}'\ .format(data['username'], data['password'])) payload = { 'username': data['username'], 'password': data['password'] } self.client.post('/register', data=payload) self.locust.user_data_queue.put_nowait(data) class WebsiteUser(HttpLocust): host = 'https://debugtalk.com' task_set = UserBehavior user_data_queue = queue.Queue() for index in range(100): data = { "username": "test%04d" % index, "password": "pwd%04d" % index, "email": "test%04d@debugtalk.test" % index, "phone": "186%08d" % index, } user_data_queue.put_nowait(data) min_wait = 1000 max_wait = 3000

Locust运行模式

1. 单进程

2. 单台机器 开启 多个slave (按cpu的核数,可开启等于N的数量(包括master))

3. 分布式多台机器

多进程分布式运行

1. 先启动一个 master

2. 在启动多个slave

开启master, 用--master

$ locust -H http://yingzi-dataflow-smart-gateway.stage.yingzi.com -f locusttest.py --web-host=127.0.0.1 --master --port=8089

启动slaver ,用 --slave

$ locust -H http://yingzi-dataflow-smart-gateway.stage.yingzi.com -f locusttest.py --slave

如果slave与master不在同一台机器上,还需要通过--master-host参数再指定master的IP地址。

$ locust -H http://yingzi-dataflow-smart-gateway.stage.yingzi.com -f locusttest.py --slave --master-host=<locust_machine_ip>

demo

from locust import TaskSet,task,HttpUser,User,between import os from locust.contrib.fasthttp import FastHttpUser import json # import gevent # from gevent import monkey # gevent.monkey.patch_all() class TestCase(TaskSet): _token='' def on_start(self): url='/api/v1/auth/token' data = { "accessType": "RESTFUL", "appKey": "yingzi-idp-share", "clientIp": "172.19.61.254", "deviceId": "", "secretKey": "7he9JB74675993tz260yMZyik79iK2au" } headers = { 'Content-Type': 'application/json;charset=UTF-8', 'Accept': 'application/json, text/plain, /', 'Accept-Encoding': 'gzip, deflate', 'Accept-Language': 'zh-CN,zh;q=0.9', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.125 Safari/537.36' } res = self.client.post(url, data=json.dumps(data), headers=headers,catch_response=True) self._token=res.json()['data']['access_token'] print(self._token) @task(10) def getDataBySql(self): access_token= self._token print(access_token) url='/api/v1/data/common/getDataBySql?access_token=%s'%access_token headers = { 'Content-Type': 'application/json;charset=UTF-8', 'Accept': 'application/json, text/plain, /', 'Accept-Encoding': 'gzip, deflate', 'Accept-Language': 'zh-CN,zh;q=0.9', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.125 Safari/537.36' } data={ "act": "ihp.stage", "isPage": True, "page": 0, "requestId": "string", "searchId": 1, "size": 0, "sql": "select * from ihp_share.ihp_quarantine_onsite_appendix_ear_tag" } res=self.client.post(url,data=json.dumps(data),headers=headers,catch_response=True) print(res.status_code) class TestLocust(FastHttpUser): host='http://yingzi-dataflow-smart-gateway.stage.yingzi.com' tasks = [TestCase] min_wait=1000 max_wait=2000 # 有三个内置的等待时间函数: # constant 固定时间 # between 在最小值和最大值之间的随机时间 #wait_time = between(5, 15) # constant_pacing一个自适应时间,确保任务每x秒运行一次 if name == "main": os.system("locust -f locusttest.py --web-host=127.0.0.1") #os.system('locust -f locust_study.py --csv=result --no-web -c2000 -r2000 -t120s --loglevel=INFO --logfile=test.log') # $ locust -f examples/basic.py --csv=example --no-web -t10m # --no-web 表示不使用web界面运行测试 # -c 设置虚拟用户数 # -r 设置每秒启动虚拟用户数 # -t 设置运行时间 # -P, --port:指定web端口,默认为8089. # -n, --num - request:指定总执行测试; # -r, --hatch - rate:指定并发加压速率,默认值位1。即用户数增加的速度

用例管理

@ tag 装饰器

通过使用@tag 装饰器标记任务,您可以使用 -- tags 和 -- exclude-tags 参数来挑选测试期间执行的任务。例子:

from locust import User, constant, task, tag class MyUser(User): wait_time = constant(1) @tag('tag1') @task def task1(self): pass @tag('tag1', 'tag2') @task def task2(self): pass @tag('tag3') @task def task3(self): pass @task def task4(self): pass

如果您使用 -- tag1开始这个测试,那么在测试期间只会执行 task1和 task2。如果以 -- tag2 tag3开始,那么只执行 task2和 task3。

--exclude-tags 则相反,是做排除选择。因此,如果以 --exclude-tags tag3开始测试,则只执行 task1、 task2和 task4。排除总是优先于包含,因此如果一个任务包含一个您已经包含的标记和一个您已经排除的标记,那么它将不会被执行。

增加断言

   对于测试来说,每个自动化测试用例都应该有个断言判断,这样才能知道测试用例的成功/失败。

   在Python+Locust中,可以使用catch_response参数,with-statement和对response.failure()的调用将请求标记为失败。

with self.client.post(url=url, data=params, timeout=10, catch_response=True) as response: if response.status_code == 200: response.success() else: response.failure('Failed!')

    注意:catch_response=True这个参数是必须要加的,否则在性能测试时,后台会一直报错,提示AttributeError: 'Response' object has no attribute 'success'。

    刚刚的样例中,是举例断言status_code是否等于200,如果是,则返回成功,反之返回失败。

TaskSets可嵌套

TaskSet的一个非常重要的特性是它们可以嵌套,因为真实的网站通常以分层的方式构建,包含多个子部分。因此,嵌套任务集将允许我们定义一种行为,以更现实的方式来模拟用户。

TaskSet还可以嵌套:参考下面的代码

第一种:

class ForumPage(TaskSet): @task(20) def read_thread(self): pass @task(1) def new_thread(self): pass @task(5) def stop(self): self.interrupt() class UserBehaviour(TaskSet): tasks = {ForumPage:10} @task def index(self): pass

嵌套TaskSet的方式就像使用task属性指定任务时一样,但不是引用python函数,而是引用另一个TaskSet:

在上面的示例中,如果在执行UserBehaviour TaskSet时选择了要执行的ForumPage,则ForumPage TaskSet将开始执行。然后,ForumPage TaskSet将选择其自己的任务之一,执行它,等待,依此类推。

关于上述示例,需要注意一点,那就是在ForumPage的stop方法中调用 self.interrupt()。这样做实际上是停止执行ForumPage任务集,并在UserBehaviour实例中继续执行。如果我们在ForumPage的某处没有调用interrupt()方法,Locust将永远不会停止运行已经启动的ForumPage任务。但是通过使用中断功能,我们可以与任务权重一起定义模拟用户离开论坛的可能性。

也可以使用@task装饰器在类中内联声明嵌套的TaskSet,就像声明普通任务时一样:

class MyTaskSet(TaskSet): @task class SubTaskSet(TaskSet): @task def my_task(self): pass

顺序执行

TaskSequence类

TaskSequence类是一个TaskSet,但是它的任务是按顺序执行的。

@seq_task(1) def first_task(self): pass @seq_task(2) def second_task(self): pass @seq_task(3) @task(10) def third_task(self): pass

在上面的示例中,执行顺序定义为先执行first_task,然后执行second_task,最后执行Third_task 10次。可以看到,可以使用@task装饰器组合@seq_task,当然也可以将TaskSet嵌套在TaskSequences中,反之亦然。

请求结果统一判断

  1. 设置容错,错误信息属正常返回,此类情况可将返回指定错误信息的请求结果设置成success
  2. 设置超时,请求消耗时间大于预期的情况,可将次请求设置为fail,并在错误信息中标注失败原因
  3. 规范输出格式,请求失败是打印请求头,请求体,url等详细信息

def request_result(response, *args, **kwargs): """ 响应结果判断 :param response:响应体 :param args: 自定义 -- 可用来设置容错信息 格式[{"status_code": 400, "message": "error_1"}, {"status_code": 400, "message": "error_2"}] :param kwargs: 自定义 -- 可用来保存请求详细信息,请求失败时输出详细内容 :return: """ # 判断当前请求状态码是否为200 if response.status_code == 200: try: response_json = response.json() except JSONDecodeError as e: logging.error(e.repr()) # 返回的数据不是json格式 request_failure(response, "HTTP标准响应状态码不是200!!!", **kwargs) else: # 容错判断代码 TODO: 容错代码 if 执行通过: # 结果校验通过执行一下代码: events.request_success.fire( request_type=response.locust_request_meta["method"], name=response.locust_request_meta["name"], response_time=response.locust_request_meta["response_time"], response_length=response.locust_request_meta["content_size"], ) response._is_reported = True else: # 结果校验失败调用request_failure # 失败有可能是response_time超预期时间 # if response.locust_request_meta["response_time"] > 6000: # 响应时间超6秒,结果设为失败 request_failure(response, "error_message", **kwargs) else: request_failure(response, "HTTP标准响应状态码不是200!!!", **kwargs) def request_failure(response, err_message="", **kwargs): exception_message = "" if err_message: exception_message += "失败信息为:{},".format(err_message) if "响应超时" not in err_message: exception_message += "--- 接口返回结果: {}".format(response.text) if kwargs: for key in kwargs.keys(): exception_message += ", 请求{}为: {}".format(key, kwargs[key]) logging.error(exception_message) events.request_failure.fire( request_type=response.locust_request_meta["method"], name=response.locust_request_meta["name"], response_time=response.locust_request_meta["response_time"], response_length=response.locust_request_meta["content_size"], exception=exception_message ) response._is_reported = True

其他补充

第一个压测实例 压测 MQTT:

https://blog.csdn.net/qq_39214101/article/details/107997414

开始第二个实例压kafka

import time from locust import TaskSet, task, Locust, events from kafka import KafkaProducer import json class UserBehavior(TaskSet): def on_start(self): self.producer = KafkaProducer(bootstrap_servers=['x.x.x.x:9092']) def on_stop(self): # 该方法当程序结束时每用户进行调用,关闭连接 self.producer.close() @task(1) def sendAddCmd(self): start_time = time.time() time_unique_id = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime()) print(time_unique_id) print("===========================",start_time) try: timestamp = int(time.time()) message = { 'timestamp': timestamp, 'message': "121314" } msg = json.dumps(message) msg = msg.encode('utf-8') self.producer.send('preHandleTopic', msg) except Exception as e: total_time = int((time.time() - start_time) * 1000) events.request_failure.fire(request_type="kafka", name="add", response_time=total_time,response_length=0, exception=e) else: total_time = int((time.time() - start_time) * 1000) events.request_success.fire(request_type="kafka", name="add", response_time=total_time,response_length=0) class SocketUser(Locust): task_set = UserBehavior min_wait = 1000 # 单位毫秒 max_wait = 1000 # 单位毫秒 if name == 'main': import os os.system("locust -f SendKafka.py --host=x.x.x.x:9092")

开始第三个实例压tcp

from locust import HttpLocust, TaskSet, task import socket # 导入 socket 模块 host = socket.gethostname() # 获取本地主机名 port = 8888 # 设置端口号 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((host, port)) class UserBehaviora(TaskSet): def on_start(self): print("start") @task(1) def bky_index(self): sock.send(b'121314') re_mes = sock.recv(1024).decode() print(re_mes) class WebsiteUser(HttpLocust): task_set = UserBehaviora min_wait = 1500 max_wait = 5000 if name == "main": import os os.system("locust -f locust6.py --host=x.x.x.x:xxxx")

第三方工具

支持其他采样器协议,报告等。 Locust 插件:https://github.com/SvenskaSpel/locust-plugins/

无需手动步骤即可自动执行分布式运行 Locust集群:https://github.com/SvenskaSpel/locust-swarm/(蝗虫群)

使用其他语言 Locust主服务器和Locust从服务器通过交换msgpack消息进行通信,这是许多语言所支持的。因此,您可以使用任何喜欢的语言编写Locust任务。为了方便起见,一些库充当了从属运行程序。他们运行你的Locust任务,并定期向master报告。

Golang Boomer:https://github.com/myzhan/boomer/

Java Locust4j:https://github.com/myzhan/locust4j Swarm: https://github.com/anhldbk/swarm

配置管理 部署Locust很容易,但是有些工具仍然可以提供一定程度的便利。

tinx.locust是Ansible的一个安装角色,用于配置和控制Locust系统服务,或使用ansible-container构建Locust docker映像。还管理locustfile和相应的测试数据。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,294评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,780评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,001评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,593评论 1 289
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,687评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,679评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,667评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,426评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,872评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,180评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,346评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,019评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,658评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,268评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,495评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,275评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,207评论 2 352

推荐阅读更多精彩内容