from locust import Locust, events, task, TaskSet,User
import websocket1
import time
import gzip
import json
class WebSocketClient():
def __init__(self, host, port):
self.host = host
self.port = port
class WebSocketLocust(User):
def __init__(self, *args, **kwargs):
self.client = WebSocketClient("172.31.15.85", 9503)
class UserBehavior(TaskSet):
ws = websocket1.WebSocket()
# self.ws.connect("ws://10.98.64.103:8807")
ws.connect("ws://www.baidu.com")
@task(1)
def buy(self):
try:
start_time = time.time()
# self.ws.send('{"url":"/buy","data":{"id":"123","issue":"20170822","doubled_num":2}}')
# result = self.ws.recv()
send_info = '{"wsCode":"transfer","transCode":"10003","transNum":time.time(),"data":{"calledPhone":332000,"gateway":"SHTelecom","passBack":1631959314205}}'
# tasks = send_info
# print(send_info)
# send_info_msg = json.loads(send_info)
# print(send_info_msg)
# send_info = '{"event":"subscribe", "channel":"btc_usdt.deep"}'
while True:
# time.sleep(5)
# ws.send(json.dumps(send_info))
ws.send(send_info)
while (1):
compressData = ws.recv()
result = gzip.decompress(compressData).decode('utf-8')
if result[:7] == '{"ping"':
ts = result[8:21]
pong = '{"pong":' + ts + '}'
ws.send(pong)
ws.send(send_info)
else:
# print(result)
with open('./test_result.txt', 'a') as f:
# f.write(threading.currentThread().name + '\n')
f.write(result + '\n')
except Exception as e:
print("error is:", e)
class ApiUser(WebSocketLocust):
tasks = UserBehavior
min_wait = 100
max_wait = 200
2021-10-13
最后编辑于 :
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
相关阅读更多精彩内容
- 培生幼儿英语分级阅读·预备级 图片来源于网络,若侵权联删 整套书总共60本,里面涵盖了生活常识,礼仪,节日,等元素...