import base64
import os
import urllib
import numpy as np
import requests, time, json, threading, random
class Presstest(object):
def __init__(self, press_url):
self.press_url = press_url
def test_interface(self):
'''压测接口'''
global INDEX
INDEX += 1
global ERROR_NUM
global TIME_LENS
try:
start = time.time()
file="./home"
response_content = self.do_request(self.press_url, file)
end = time.time()
TIME_LENS.append(end - start)
print('end')
except Exception as e:
print(e)
ERROR_NUM += 1
print(e)
def test_onework(self):
'''一次并发处理单个任务'''
i = 0
while i < ONE_WORKER_NUM:
i += 1
self.test_interface()
time.sleep(LOOP_SLEEP)
def do_request(self, url, file):
count = 0
dir_list = os.listdir(file)
for i in dir_list:
dir_name = i.split('.')
if dir_name[-1] == "jpg" or dir_name[-1] == "png":
with open(file + i, 'rb') as f:
data = f.read()
encoded_data = str(base64.b64encode(data), encoding='utf-8')
datas = {"imgdata": encoded_data}
start_time = time.time()
# r = requests.post(url, data)
requests.DEFAULT_RETRIES = 5 # 增加重试连接次数
s = requests.session()
s.keep_alive = False
r = requests.post(url, data=json.dumps(datas), headers={'Content-Type': 'application/json'},
timeout=200)
text = json.loads(r.text)
end_time = time.time()
count += round(end_time - start_time, 3)
print(i)
print(round(end_time - start_time, 3))
print(count)
def run(self):
'''使用多线程进程并发测试'''
t1 = time.time()
Threads = []
for i in range(THREAD_NUM):
t = threading.Thread(target=self.test_onework, name="T" + str(i))
t.setDaemon(True)
Threads.append(t)
for t in Threads:
t.start()
for t in Threads:
t.join()
t2 = time.time()
print("===============压测结果===================")
print("URL:", self.press_url)
print("任务数量:", THREAD_NUM, "*", ONE_WORKER_NUM, "=", THREAD_NUM * ONE_WORKER_NUM)
print("总耗时(秒):", t2 - t1)
print("每次请求耗时(秒):", (t2 - t1) / (THREAD_NUM * ONE_WORKER_NUM))
print("每秒承载请求数:", 1 / ((t2 - t1) / (THREAD_NUM * ONE_WORKER_NUM)))
# print("错误数量:", ERROR_NUM)
print(INDEX)
if __name__ == "__main__":
press_url = 'http://127.0.0.0:8080'
TIME_LENS = []
INDEX = 0
THREAD_NUM = 5000 # 并发线程总数
ONE_WORKER_NUM = 1 # 每个线程的循环次数
LOOP_SLEEP = 0 # 每次请求时间间隔(秒)
ERROR_NUM = 0 # 出错数
obj = Presstest(press_url)
obj.run()