语言版本:python2.7
一:dnsClient.py
# -*- coding: utf-8 -*-
# @Time : 2020-01-08 15:32
# @Author : xiaobin
# @File : dnsClient.py
# @Software : PyCharm
import time
from locustimport (TaskSet,task,events,Locust)
from gevent._semaphoreimport Semaphore
import socket
all_locusts_spawned = Semaphore()
all_locusts_spawned.acquire()
def on_hatch_complete(**kwargs):
all_locusts_spawned.release()
events.hatch_complete += on_hatch_complete
class dnsClient(object):
def look_host(self, hostname):
start_time = time.time()
time.sleep(0.195)
resp =self.get_host(hostname)
total_time =int((time.time() - start_time) *1000)
print total_time
if resp:
events.request_success.fire(
request_type='dns-test',
name=r'dns',
response_time=total_time,
response_length=0)
else:
events.request_success.fire(
request_type ='dns-test',
name =r'dns-error',
response_time = total_time,
exception ='error')
def get_host(self, hostname):
falg =False
try:
result = socket.gethostbyaddr(hostname)
falg =True
except socket.herror, e:
print e
return falg
if __name__ =='__main__':
d = dnsClient()
d.look_host('dnstest.com')
二:locustDns.py
# -*- coding: utf-8 -*-
# @Time : 2020-01-08 15:47
# @Author : xiaobin
# @File : locustDns.py
# @Software : PyCharm
from locustimport (TaskSet,task,events,Locust)
import core.dns_test.dnsClient
class locustDns(Locust):
def __init__(self):
super(locustDns, self).__init__()
self.client = core.dns_test.dnsClient.dnsClient()
三:LocustRun.py
# -*- coding: utf-8 -*-
# @Time : 2020-01-08 15:52
# @Author : xiaobin
# @File : TestDns.py
# @Software : PyCharm
import sys
import os
curPath = os.path.abspath(os.path.dirname(__file__))
rootPath = os.path.split(curPath)[0]
sys.path.append(rootPath)
from locustimport *
import core.dns_test.locustDns
class LocustRun(TaskSet):
def setup(self):
print('task setup')
def teardown(self):
print('task teardown')
def on_start(self):
print('start')
def on_stop(self):
# 虚拟用户结束Task时运行
print('end')
@task(weight=1)
def test_dns(self):
self.client.look_host('dnstest.com')
class myrun(core.dns_test.locustDns.locustDns):
task_set = LocustRun
host ='http://127.0.0.1:8888'
min_wait =0
max_wait =0
四:运行locust -f LocustRun.py 就可以了
记得本地/etc/hosts 增加测试域名