【Python】【压力测试】Locust压力测试工具

Python011

【Python】【压力测试】Locust压力测试工具,第1张

性能测试参数

熟悉 Apache ab 工具的同学都知道,它是没有界面的,通过命令行执行。 Locust 同样也提供的命令行运行,好处就是更节省客户端资源。

启动参数:

--no-web 表示不使用Web界面运行测试。

-c 设置虚拟用户数。

-r 设置每秒启动虚拟用户数。

-t 设置设置运行时间。

出现的报错及解决办法:

使用Locust进行性能测试,Locust no-web模式执行命令locust -f zkxl_verify_ locust.py --host= https://www.baidu.com --no-web -c 10 -r 2 -t 1m

提示locust: error: unrecognized arguments: --no-web -c

参考locust官方文档 https://docs.locust.io/en/latest/running-locust-without-web-ui.html?highlight=no-web

将命令参数--no-web 更改为 --headless,将命令中指定用户并发数的参数 -c 改为 -u,即更改命令为:locust -f zkxl_verify_ locust.py --host= https://www.baidu.com --headless -u 10 -r 2 -t 1m 即可.

locust的测试数据可以保存到CSV文件中,有两种方法可以进行此操作:

首先,通过Web UI运行Locust时,可以在“Download Data”选项卡下得到CSV文件。

其次,可以使用标签运行Locust,该标签将定期保存两个CSV文件。如果计划使用--no-web标签以自动化方式运行Locust

文件将被命名为example_response_times.csv 和 example_stats.csv (使用--csv=example)并记录Locust构建的信息。

如果你想要更快(慢)的写入速度,也可以自动以写入频率:

此数据将写入两个文件,并将_response_times.csv和_stats.csv添加到你提供的名称中:

打开命令提示符(或Linux终端),输入 locust --help 。

参考: 官方文档

一旦单台机器不够模拟足够多的用户时,Locust支持运行在多台机器中进行压力测试。

为了实现这个,你应该在 master 模式中使用 --master 标记来启用一个 Locust 实例。这个实例将会运行你启动测试的 Locust 交互网站并查看实时统计数据。master 节点的机器自身不会模拟任何用户。相反,你必须使用 --slave 标记启动一台到多台 Locustslave 机器节点,与标记 --master-host 一起使用(指出master机器的IP/hostname)。

常用的做法是在一台独立的机器中运行master,在slave机器中每个处理器内核运行一个slave实例。

在 master 模式下启动 Locust:

在每个 slave 中执行(192.168.0.14 替换为你 msater 的IP):

参数

--master

设置 Locust 为 master 模式。网页交互会在这台节点机器中运行。

--slave

设置 Locust 为 slave 模式。

--master-host=X.X.X.X

可选项,与 --slave 一起结合使用,用于设置 master 模式下的 master 机器的IP/hostname(默认设置为127.0.0.1)

--master-port=5557

可选项,与 --slave 一起结合使用,用于设置 master 模式下的 master 机器中 Locust 的端口(默认为5557)。注意,locust 将会使用这个指定的端口号,同时指定端口+1的号也会被占用。因此,5557 会被使用,Locust将会使用 5557 和 5558。

--master-bind-host=X.X.X.X`

可选项,与 --master 一起结合使用。决定在 master 模式下将会绑定什么网络接口。默认设置为*(所有可用的接口)。

--master-bind-port=5557

可选项,与 --master 一起结合使用。决定哪个网络端口 master 模式将会监听。默认设置为 5557。注意 Locust 会使用指定的端口号,同时指定端口+1的号也会被占用。因此,5557 会被使用,Locust 将会使用 5557 和 5558。

--expect-slaves=X

no-web 模式下启动 master 时使用。master 将等待X连接节点在测试开始之前连接。

如下图,我启动了一个 master 和两个 slave,由两个 slave 来向被测试系统发送请求。

client属性:

TaskSet类:实现了虚拟用户所执行任务的调度算法,包括规划任务执行顺序(schedule_task)、挑选下一个任务(execute_next_task)、执行任务(execute_task)、休眠等待(wait)、中断控制(interrupt)等等。

在此基础上,我们就可以在TaskSet子类中采用非常简洁的方式来描述虚拟用户的业务测试场景,对虚拟用户的所有行为(任务)进行组织和描述,并可以对不同任务的权重进行配置。

在TaskSet子类中定义任务信息时,可以采取两种方式, @task 装饰器和 tasks 属性。

@task(1)中的数字表示任务的执行频率,数值越大表示执行的频率越高

采用tasks属性定义任务:

tasks = {test_job1:1, test_job2:2}中,test_job1:1,test_job2:2表示事件执行的频率,即test_job2的执行频率是test_job1的两倍

on_start函数是在Taskset子类中使用比较频繁的函数。在正式执行测试前执行一次,主要用于完成一些初始化的工作。

例如,当测试某个搜索功能,而该搜索功能又要求必须为登录态的时候,就可以先在on_start中进行登录操作,HttpLocust使用到了requests.Session,因此后续所有任务执行过程中就都具有登录态了

在TaskSequence类中,[email protected]_task()可以用来控制任务的执行顺序;里面的数值越小执行越靠前;

在Taskset类中,内置WAIT_TIME功能,它用于确定模拟用户在执行任务之间将等待多长时间。Locust提供了一些内置的函数,返回一些常用的wait_time方法。

1、 between(min,max)函数 :用得比较多的函数

wait_time = between(3.0, 10.5):任务之间等待的时间是3到10.5秒之间的任意时间

还可以用任意函数来定义等待时间, 比如平均1秒的等待时间

wait_time = lambda self: random.expovariate(1) 1000

2、 constant(number) 函数:

wait_time=constant(3):任务之间等待的时候是3秒钟,且等待的时候不能超过任务运行的总时间,也就是在执行py文件时设置的时间

3、 constant_pacing(number) *函数:

wait_time=constant_pacing(3):所以任务每隔3秒执行,但是当到达运行的总时间时,任务运行结束;

现实中有很多任务其实也是有嵌套结构的,比如用户打开一个网页首页后,用户可能会不喜欢这个网页直接离开,或者喜欢就留下来,留下来的话,可以选择看书、听音乐、或者离开;

在有Taskset嵌套的情况下,执行子任务时, 通过 self.interrupt() 来终止子任务的执行, 来回到父任务类中执行, 否则子任务会一直执行;

在上一页的案例中,在stay这个类中,对interrupt()方法的调用是非常重要的,这可以让一个用户跳出stay这个类有机会执行leave这个任务,否则他一旦进入stay任务就会一直在看书或者听音乐而难以自拔。

在进行接口多用户并发测试时,数据的重复使用可能会造成脚本的失败,那么需要对用户数据进行参数化来使脚本运行成功。

已登录功能为例:

创建 login_user() 方法,定义登录字典 users , 通过randint 随机获取字典中的用户数据。

在 login() 登录任务中,调用 login_user() 方法实现 随机用户的登录。

在此我们举出百度搜索的例子,假设每个人搜索的内容是不相同的;那么我们可以假设把数据放到队列中,然后从队列中依次把数据取出来;

可以利用python中Queue队列来进行处理;

Queue的种类

Queue.Queue(maxsize=0):先进先出队列

Queue.LifoQueue(maxsize=0):后进先出队列

Queue.PriorityQueue(maxsize=0):构造一个优先队列

参数maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制

Queue的基本方法

个别情况下测试数据可重复使用,因此我们可以把参数化数据定义为一个列表,在列表中取出数据;

在某些请求中,需要携带之前response中提取的参数,常见场景就是session_id。Python中可用通过re正则匹配,对于返回的html页面,可用采用lxml库来定位获取需要的参数;

我们以Phpwind登陆的来进行举例,在登陆的接口中需要把token参数传给服务器,token的值由页的接口返回;

方法一:使用正则表达式

方法二:采用lxml库来定位获取需要的参数

技术点:

1、导模块:lxml模块

2、etree.HTML() 从返回html页面获取html文件的dom结构

3、xpath() 获取token的xpath路径

catch_response = True :布尔类型,如果设置为 True, 允许该请求被标记为失败。

通过 client.get() 方法发送请求,将整个请求的给 response, 通过 response.status_code 得请求响应的 HTTP 状态码。如果不为 200 则通过 response.failure('Failed!') 打印失败!

参考文章:

https://www.jianshu.com/p/a48f4af81e67

https://www.cnblogs.com/fnng/p/6081798.html

http://class.itest.info/locust 【虫师】

https://cloud.tencent.com/developer/article/1594240 【官方文档的中文翻译】

花了一天时间用python为服务写了个压力测试。很简单,多线程向服务器发请求。但写完之后发现如果中途想停下来,按Ctrl+C达不到效果,自然想到要用信号处理函数捕捉信号,使线程都停下来,问题解决的方法请往下看:

复制代码代码如下:

#!/bin/env python

# -*- coding: utf-8 -*-

#filename: peartest.py

import threading, signal

is_exit = False

def doStress(i, cc):

global is_exit

idx = i

while not is_exit:

if (idx <10000000):

print "thread[%d]: idx=%d"%(i, idx)

idx = idx + cc

else:

break

print "thread[%d] complete."%i

def handler(signum, frame):

global is_exit

is_exit = True

print "receive a signal %d, is_exit = %d"%(signum, is_exit)

if __name__ == "__main__":

signal.signal(signal.SIGINT, handler)

signal.signal(signal.SIGTERM, handler)

cc = 5

for i in range(cc):

t = threading.Thread(target=doStress, args=(i,cc))

t.start()

上面是一个模拟程序,并不真正向服务发送请求,而代之以在一千万以内,每个线程每隔并发数个(cc个)打印一个整数。很明显,当所有线程都完成自己的任务后,进程会正常退出。但如果我们中途想退出(试想一个压力测试程序,在中途已经发现了问题,需要停止测试),该肿么办?你当然可以用ps查找到进程号,然后kill -9杀掉,但这样太繁琐了,捕捉Ctrl+C是最自然的想法。上面示例程序中已经捕捉了这个信号,并修改全局变量is_exit,线程中会检测这个变量,及时退出。

但事实上这个程序并不work,当你按下Ctrl+C时,程序照常运行,并无任何响应。网上搜了一些资料,明白是python的子线程如果不是daemon的话,主线程是不能响应任何中断的。但设为daemon后主线程会随之退出,接着整个进程很快就退出了,所以还需要在主线程中检测各个子线程的状态,直到所有子线程退出后自己才退出,因此上例29行之后的代码可以修改为:

复制代码代码如下:

threads=[]

for i in range(cc):

t = threading.Thread(target=doStress, args=(i, cc))

t.setDaemon(True)

threads.append(t)

t.start()

for i in range(cc):

threads[i].join()

重新试一下,问题依然没有解决,进程还是没有响应Ctrl+C,这是因为join()函数同样会waiting在一个锁上,使主线程无法捕获信号。因此继续修改,调用线程的isAlive()函数判断线程是否完成:

复制代码代码如下:

while 1:

alive = False

for i in range(cc):

alive = alive or threads[i].isAlive()

if not alive:

break

这样修改后,程序完全按照预想运行了:可以顺利的打印每个线程应该打印的所有数字,也可以中途用Ctrl+C终结整个进程。完整的代码如下:

复制代码代码如下:

#!/bin/env python

# -*- coding: utf-8 -*-

#filename: peartest.py

import threading, signal

is_exit = False

def doStress(i, cc):

global is_exit

idx = i

while not is_exit:

if (idx <10000000):

print "thread[%d]: idx=%d"%(i, idx)

idx = idx + cc

else:

break

if is_exit:

print "receive a signal to exit, thread[%d] stop."%i

else:

print "thread[%d] complete."%i

def handler(signum, frame):

global is_exit

is_exit = True

print "receive a signal %d, is_exit = %d"%(signum, is_exit)

if __name__ == "__main__":

signal.signal(signal.SIGINT, handler)

signal.signal(signal.SIGTERM, handler)

cc = 5

threads = []

for i in range(cc):

t = threading.Thread(target=doStress, args=(i,cc))

t.setDaemon(True)

threads.append(t)

t.start()

while 1:

alive = False

for i in range(cc):

alive = alive or threads[i].isAlive()

if not alive:

break

其实,如果用python写一个服务,也需要这样,因为负责服务的那个线程是永远在那里接收请求的,不会退出,而如果你想用Ctrl+C杀死整个服务,跟上面的压力测试程序是一个道理。总结一下,python多线程中要响应Ctrl+C的信号以杀死整个进程,需要:

1.把所有子线程设为Daemon;

2.使用isAlive()函数判断所有子线程是否完成,而不是在主线程中用join()函数等待完成;

3.写一个响应Ctrl+C信号的函数,修改全局变量,使得各子线程能够检测到,并正常退出。