使用pandas与queue实现locust脚本参数化

Locust作为开源的压力测试工具,这几年备受关注,用法已经被大神们开发的差不多了。最近版本大更新,我重新学习一些基础用法,却发现参数化这一块的代码不能满足我的需求。所以这次我从一个小白的角度出发,争取用简单的方式,实现Locust获取外部csv数据并参数化。

在直接编写代码之前,要了解两个库:pandas与queue。(想直接看代码麻烦手动跳到最后。)

pandas——Python的数据分析支持库

Python Data Analysis Library 或 pandas 是基于NumPy 的一种工具,该工具是为了解决数据分析任务而创建的。Pandas 纳入了大量库和一些标准的数据模型,提供了高效地操作大型数据集所需的工具。

简言之,pandas高效,方便,好用。

它有两种独有的数据结构:Series 和 DataFrame。

  • Series:与Python基本的数据结构List相近,Series中只允许存储相同的数据类型
  • DataFrame:二维的表格型数据结构。很多功能与R中的data.frame类似。可以将DataFrame理解为Series的容器。

在获取csv数据时,我使用了DataFrame数据结构,这种表格形式更方便理解与操作,下面是基本用法:

  from pandas import Series,DataFrame

 # 使用dict定义DataFrame
 data = {"name":['google','baidu','yahoo'],"marks":[100,200,300],"price":[1,2,3]}

 # 读取csv文件
 reader = pandas.read_csv(file_path)

 # 将数据转换为DataFrame格式
 df = pd.DataFrame(reader)

 # 遍历行数据
 for index, row in dFrame.iterrows():
            print(index, row[i])   i为列序号,从0开始

queue——一种同步的队列类

队列queue 多应用在多线程应用中,多线程访问共享变量。对于多线程而言,访问共享变量时,队列queue是线程安全的。从queue队列的具体实现中,可以看出queue使用了1个线程互斥锁(pthread.Lock()),以及3个条件标量(pthread.condition()),来保证了线程安全。【线程安全:一个函数,当且仅当被多个并发线程反复调用时,能够一直产生正确的结果,才能够被称为线程安全的(thread-safe)】

我主要使用queue.Queue来实例化参数队列,将数据put到对列中,再从队列中get出来,传给相应请求。下面为基本用法:

  import queue

  # 实例化一个队列
  queue_data = queue.Queue()  
  # put数据
  queue_data.put_nowait(user)
  # get数据
  queue_data.get_nowait(user)

put_nowait()与get_nowait()实际为block=False的put()与get(),消息队列如果没有空间可写入/获取,则会立刻抛出“queue.Full/queue.Empty”异常

pandas与queue结合,实现参数化

  import queue
  import pandas as pd
  import sys
  from locust import task, HttpUser, between, TaskSet

  # 预防栈溢出,设置嵌套层数上限为10000
  sys.setrecursionlimit(10000)

  # 测试数据csv文件路径
  file_path = "test_data.csv"

  # 读取csv文件
  reader = pd.read_csv(file_path)
  # 将数据转换为DataFrame格式
  df = pd.DataFrame(reader)


  # 定义csv文件的单列数据获取函数
  def queue_data(dFrame, i, **kwargs):
      data = queue.Queue()  # 先进先出
      # queue_data = queue.LifoQueue() 后进先出
      for index, row in dFrame.iterrows():
          # print(index, row[i])   i为列序号,从0开始
          try:
              data.put_nowait(row[i])
          except queue.Full:
              print("队列溢出")
      return data

  class MyTaskSet(TaskSet):
      # 在task中使用queue数据
      @task
      def task1(self):
          # 使用queue中的数据进行参数化
          url = "/postData"
          try:
              row0_data = self.user.row0.get_nowait()
              row1_data = self.user.row1.get_nowait()
              payload = {"row0": row0_data, "row1": row1_data}
              print(payload)
              res = self.client.post(url, json=payload, name="使用queue中的数据进行参数化")
          except queue.Empty:
              # 队列取空后退出
              print("queue is empty")
              exit()


  class MyUser(HttpUser):
      host = "http://example.com"
      tasks = [MyTaskSet]
      wait_time = between(1, 3)
      # 获取单列数据放入对应队列中
      row0 = queue_data(df, 0)
      row1 = queue_data(df, 1)
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。