异步爬虫
目标:例举asyncio
和aiohttp
模块的常规用法代码
关于协程概念参考:https://blog.csdn.net/weixin_40743639/article/details/122394616?spm=1001.2014.3001.5502
一. 协程的基本原理
1. 定义协程
方式一:
import asyncio
async def execute(x): # 定义协程函数
print('Number:', x)
coroutine = execute(1) # 调用协程函数返回协程对象
print('Coroutine:', coroutine) # 打印协程对象
print('After calling execute')
loop = asyncio.get_event_loop() # 获取事件循环
loop.run_until_complete(coroutine) # 将协程添加到事件循环,并启动
print('After calling loop')
运行效果:
Coroutine: <coroutine object execute at 0x00000225FF3263C8>
After calling execute
Number: 1
After calling loop
方式二:
将协程对象打包成任务对象,操作有点多余。
import asyncio
async def execute(x): # 定义协程函数
print('Number:', x)
return x
coroutine = execute(1) # 调用协程函数返回协程对象
print('Coroutine', coroutine) # 打印协程对象
print('After calling execute')
loop = asyncio.get_event_loop() # 获取事件循环
task = loop.create_task(coroutine) # 将协程对象打包成任务对象
print('Task:', task) # 打印任务对象,挂起状态
loop.run_until_complete(task) # 将任务添加到事件循环,并启动
print('Task:', task) # 打印任务对象,完成状态
print('After calling loop')
运行效果:
Coroutine <coroutine object execute at 0x0000022DE4DE54C8>
After calling execute
Task: <Task pending coro=<execute() running at c:/Users/Apphao/Desktop/Python/cuiqingcai/6 异步爬虫/6.1 协程的基本原理/AsyncTest/03_demo3.py:3>>
Number: 1
Task: <Task finished coro=<execute() done, defined at c:/Users/Apphao/Desktop/Python/cuiqingcai/6 异步爬虫/6.1 协程的基本原理/AsyncTest/03_demo3.py:3> result=1>
After calling loop
方式三:
如果要添加回调函数,使用这种方法
import asyncio
async def execute(x): # 定义协程函数
print('Number:', x)
return x
coroutine = execute(1) # 调用协程函数返回协程对象
print('Coroutine:', coroutine) # 打印协程对象
print('After calling execute')
task = asyncio.ensure_future(coroutine) # 确保协程对象是一个future对象,和task对象一样
print('Task:', task) # 打印任务对象,挂起状态
loop = asyncio.get_event_loop() # 获取事件循环
loop.run_until_complete(task) # 将任务添加到事件循环,并启动
print('Task:', task) # 打印任务对象,完成状态
print('After calling loop')
运行效果:
Coroutine: <coroutine object execute at 0x00000225050154C8>
After calling execute
Task: <Task pending coro=<execute() running at c:/Users/Apphao/Desktop/Python/cuiqingcai/6 异步爬虫/6.1 协程的基本原理/AsyncTest/04_demo4.py:3>>
Number: 1
Task: <Task finished coro=<execute() done, defined at c:/Users/Apphao/Desktop/Python/cuiqingcai/6 异步爬虫/6.1 协程的基本原理/AsyncTest/04_demo4.py:3> result=1>
After calling loop
2. 绑定回调
import asyncio
import requests
async def request(): # 定义协程函数
url = 'https://www.baidu.com'
status = requests.get(url)
return status
def callback(task): # 定义回调函数
print('Status:', task.result())
coroutine = request() # 调用协程函数返回协程对象
task = asyncio.ensure_future(coroutine) # 确保协程对象是一个future对象,和task对象一样
task.add_done_callback(callback) # 给任务添加回调函数
print('Task:', task) # 打印任务对象,挂起状态
loop = asyncio.get_event_loop() # 获取事件循环
loop.run_until_complete(task) # 将任务添加到事件循环,并启动
print('Task:', task) # 打印任务对象,完成状态
运行效果:
Task: <Task pending coro=<request() running at c:/Users/Apphao/Desktop/Python/cuiqingcai/6 异步爬虫/6.1 协程的基本原理/AsyncTest/05_demo5.py:4> cb=[callback() at c:/Users/Apphao/Desktop/Python/cuiqingcai/6 异步爬虫/6.1 协程的基本原理/AsyncTest/05_demo5.py:9]>
Status: <Response [200]>
Task: <Task finished coro=<request() done, defined at c:/Users/Apphao/Desktop/Python/cuiqingcai/6 异步爬虫/6.1 协程的基本原理/AsyncTest/05_demo5.py:4> result=<Response [200]>>
也可以不通过回调函数获得协程任务的执行结果
import asyncio
import requests
async def request(): # 定义协程函数
url = 'https://www.baidu.com'
status = requests.get(url)
return status
coroutine = request() # 调用协程函数返回协程对象
task = asyncio.ensure_future(coroutine) # 确保协程对象是一个future对象,和task对象一样
print('Task:', task) # 打印任务对象,挂起状态
loop = asyncio.get_event_loop() # 获取事件循环
loop.run_until_complete(task) # 将任务添加到事件循环,并启动
print('Task:', task) # 打印任务对象,完成状态
print('Task Result:', task.result()) # 打印协程函数的返回值
运行效果:
Task: <Task pending coro=<request() running at c:/Users/Apphao/Desktop/Python/cuiqingcai/6 异步爬虫/6.1 协程的基本原理/AsyncTest/06_demo6.py:4>>
Task: <Task finished coro=<request() done, defined at c:/Users/Apphao/Desktop/Python/cuiqingcai/6 异步爬虫/6.1 协程的基本原理/AsyncTest/06_demo6.py:4> result=<Response [200]>>
Task Result: <Response [200]>
3. 多任务协程
import asyncio
import requests
async def request(): # 定义协程函数
url = 'https://www.baidu.com'
status = requests.get(url)
return status
tasks = [asyncio.ensure_future(request()) for _ in range(5)] # 利用列表生成式,调用协程函数返回协程对象列表
print('Tasks:', tasks) # 打印协程任务列表
loop = asyncio.get_event_loop() # 获取事件循环
loop.run_until_complete(asyncio.wait(tasks)) # 将任务列表添加到事件循环,并启动
for task in tasks:
print('Task Result:', task.result()) # 打印协程函数的返回值
运行效果:
Tasks: [<Task pending coro=<request() running at c:/Users/Apphao/Desktop/Python/cuiqingcai/6 异步爬虫/6.1 协程的基本原理/AsyncTest/07_demo7.py:4>>, <Task pending coro=<request() running at c:/Users/Apphao/Desktop/Python/cuiqingcai/6 异步
爬虫/6.1 协程的基本原理/AsyncTest/07_demo7.py:4>>, <Task pending coro=<request() running at c:/Users/Apphao/Desktop/Python/cuiqingcai/6 异步爬虫/6.1 协程的基本原理/AsyncTest/07_demo7.py:4>>, <Task pending coro=<request() running at c:/Users/Apphao/Desktop/Python/cuiqingcai/6 异步爬虫/6.1 协程的基本原理/AsyncTest/07_demo7.py:4>>, <Task pending coro=<request() running at c:/Users/Apphao/Desktop/Python/cuiqingcai/6 异步爬虫/6.1 协程的基本原理/AsyncTest/07_demo7.py:4>>]
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
4. 使用aiohttp
import asyncio
import aiohttp
import time
start = time.time()
async def get(url): # 定义协程函数
session = aiohttp.ClientSession()
response = await session.get(url) # 原本需要阻塞等待的地方前面加上await,切换到其他协程任务
await response.text() # 协程开启后,只要返回的是协程对象就需要在前面加上await
await session.close()
return response
async def request():
url = 'https://httpbin.org/delay/5'
print('Waiting for', url)
response = await get(url) # 协程开启后,只要返回的是协程对象就需要在前面加上await
print('Get response from', url, 'response', response)
tasks = [asyncio.ensure_future(request()) for _ in range(3)] # 利用列表生成式,调用协程函数返回协程对象列表
loop = asyncio.get_event_loop() # 获取事件循环
loop.run_until_complete(asyncio.wait(tasks)) # 将任务列表添加到事件循环,并启动
end = time.time()
print('Cost time:', end - start)
运行效果:
Waiting for https://httpbin.org/delay/5
Waiting for https://httpbin.org/delay/5
Waiting for https://httpbin.org/delay/5
Get response from https://httpbin.org/delay/5 response <ClientResponse(https://httpbin.org/delay/5) [200 OK]>
<CIMultiDictProxy('Date': 'Wed, 16 Mar 2022 13:16:38 GMT', 'Content-Type': 'application/json', 'Content-Length': '360', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Credentials': 'true')>
Get response from https://httpbin.org/delay/5 response <ClientResponse(https://httpbin.org/delay/5) [200 OK]>
<CIMultiDictProxy('Date': 'Wed, 16 Mar 2022 13:16:38 GMT', 'Content-Type': 'application/json', 'Content-Length': '360', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Credentials': 'true')>
Get response from https://httpbin.org/delay/5 response <ClientResponse(https://httpbin.org/delay/5) [200 OK]>
<CIMultiDictProxy('Date': 'Wed, 16 Mar 2022 13:16:39 GMT', 'Content-Type': 'application/json', 'Content-Length': '360', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Credentials': 'true')>
Cost time: 9.281606435775757
测试以下aiohttp
多个协程访问百度的速度
import asyncio
import aiohttp
import time
def test(number):
start = time.time()
async def get(url): # 定义协程函数
session = aiohttp.ClientSession()
response = await session.get(url) # 原本需要阻塞等待的地方前面加上await,切换到其他协程任务
await response.text() # 协程开启后,只要返回的是协程对象就需要在前面加上await
await session.close()
return response
async def request(): # 定义协程函数
url = 'https://www.baidu.com/'
await get(url) # 协程开启后,只要返回的是协程对象就需要在前面加上await
tasks = [asyncio.ensure_future(request()) for _ in range(number)] # 利用列表生成式,调用协程函数返回协程对象列表
loop = asyncio.get_event_loop() # 获取事件循环
loop.run_until_complete(asyncio.wait(tasks)) # 将任务列表添加到事件循环,并启动
end = time.time()
print('Number:', number, 'Cost time:', end - start)
for number in [1, 3, 5, 10, 15, 30, 75, 100, 200, 500]:
test(number)
运行效果:
Number: 1 Cost time: 0.12730693817138672
Number: 3 Cost time: 0.09128069877624512
Number: 5 Cost time: 0.08041119575500488
Number: 10 Cost time: 0.15501713752746582
Number: 15 Cost time: 0.1743018627166748
Number: 30 Cost time: 0.25256872177124023
Number: 75 Cost time: 0.7528872489929199
Number: 100 Cost time: 0.36460232734680176
Number: 200 Cost time: 0.9137499332427979
Number: 500 Cost time: 3.35489559173584
二. aiohttp的使用
1. 基本实例
import aiohttp
import asyncio
async def fetch(session, url): # 定义协程函数
async with session.get(url) as response: # 协程函数中上下文管理器的写法
return await response.text(), response.status
async def main(): # 定义协程函数
async with aiohttp.ClientSession() as session: # 协程函数中上下文管理器的写法
html, status = await fetch(session, 'https://www.baidu.com/') # 协程开启后,只要返回的是协程对象就需要在前面加上await
print(f'html: {html[:100]}...')
print(f'status: {status}')
if __name__ == '__main__':
# asyncio.run(main()) # python3.7之后的写法,等价于下面两句话
loop = asyncio.get_event_loop() # 获取事件循环
loop.run_until_complete(main()) # 将协程对象添加到事件循环,并启动
运行效果:
html: <html>
<head>
<script>
location.replace(location.href.replace("https://","http://"));
</scri...
status: 200
2. URL参数设置
import aiohttp
import asyncio
async def main(): # 定义协程函数
params = {'name': 'apphao', 'age': 25}
async with aiohttp.ClientSession() as session:
async with session.get('https://httpbin.org/get', params=params) as response: # get请求,并携带参数
print(await response.text())
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main()) # 获取事件循环,将协程对象添加到事件循环,并启动
运行效果:
{
"args": {
"age": "25",
"name": "apphao"
},
"headers": {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Host": "httpbin.org",
"User-Agent": "Python/3.7 aiohttp/3.8.1",
"X-Amzn-Trace-Id": "Root=1-6231e602-74a6d4d74b9a70c761705a6c"
},
"origin": "218.82.172.215",
"url": "https://httpbin.org/get?name=apphao&age=25"
}
3. POST请求
提交form表单
import aiohttp
import asyncio
async def main(): # 定义协程函数
data = {'name': 'apphao', 'age': 25}
async with aiohttp.ClientSession() as session:
async with session.post('https://httpbin.org/post', data=data) as response: # post请求,并携带参数
print(await response.text())
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main()) # 获取事件循环,将协程对象添加到事件循环,并启动
运行效果:
{
"args": {},
"data": "",
"files": {},
"form": {
"age": "25",
"name": "apphao"
},
"headers": {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Content-Length": "18",
"Content-Type": "application/x-www-form-urlencoded",
"Host": "httpbin.org",
"User-Agent": "Python/3.7 aiohttp/3.8.1",
"X-Amzn-Trace-Id": "Root=1-6231e65a-1521583f3b18bb4d0b95dc45"
},
"json": null,
"origin": "218.82.172.215",
"url": "https://httpbin.org/post"
}
提交json字符串
import aiohttp
import asyncio
async def main(): # 定义协程函数
data = {'name': 'apphao', 'age': 25}
async with aiohttp.ClientSession() as session:
async with session.post('https://httpbin.org/post', json=data) as response: # post请求,并携带参数
print(await response.text())
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main()) # 获取事件循环,将协程对象添加到事件循环,并启动
运行效果:
{
"args": {},
"data": "{\"name\": \"apphao\", \"age\": 25}",
"files": {},
"form": {},
"headers": {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Content-Length": "29",
"Content-Type": "application/json",
"Host": "httpbin.org",
"User-Agent": "Python/3.7 aiohttp/3.8.1",
"X-Amzn-Trace-Id": "Root=1-6231e7f4-74d9b029347afc9f20a21534"
},
"json": {
"age": 25,
"name": "apphao"
},
"origin": "218.82.172.215",
"url": "https://httpbin.org/post"
}
4. 响应
import aiohttp
import asyncio
async def main():
data = {'name': 'apphao', 'age': 25}
async with aiohttp.ClientSession() as session:
async with session.post('http://www.httpbin.org/post', data=data) as response:
print('status:', response.status)
print('headers:', response.headers)
print('body:', await response.text()) # 协程开启后,只要返回的是协程对象就需要在前面加上await
print('bytes:', await response.read()) # 协程开启后,只要返回的是协程对象就需要在前面加上await
print('json:', await response.json()) # 协程开启后,只要返回的是协程对象就需要在前面加上await
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
运行效果:
status: 200
headers: <CIMultiDictProxy('Date': 'Wed, 16 Mar 2022 13:41:52 GMT', 'Content-Type': 'application/json', 'Content-Length': '510', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Credentials': 'true')>
body: {
"args": {},
"data": "",
"files": {},
"form": {
"age": "25",
"name": "apphao"
},
"headers": {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Content-Length": "18",
"Content-Type": "application/x-www-form-urlencoded",
"Host": "www.httpbin.org",
"User-Agent": "Python/3.7 aiohttp/3.8.1",
"X-Amzn-Trace-Id": "Root=1-6231e920-42d270f01ad9475f4392ca31"
},
"json": null,
"origin": "218.82.172.215",
"url": "http://www.httpbin.org/post"
}
bytes: b'{\n "args": {}, \n "data": "", \n "files": {}, \n "form": {\n "age": "25", \n "name": "apphao"\n }, \n "headers": {\n "Accept": "*/*", \n "Accept-Encoding": "gzip, deflate", \n "Content-Length": "18", \n "Content-Type": "application/x-www-form-urlencoded", \n "Host": "www.httpbin.org", \n "User-Agent": "Python/3.7 aiohttp/3.8.1", \n "X-Amzn-Trace-Id": "Root=1-6231e920-42d270f01ad9475f4392ca31"\n }, \n "json": null, \n "origin": "218.82.172.215", \n "url": "http://www.httpbin.org/post"\n}\n'
json: {'args': {}, 'data': '', 'files': {}, 'form': {'age': '25', 'name': 'apphao'}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Content-Length': '18', 'Content-Type': 'application/x-www-form-urlencoded', 'Host': 'www.httpbin.org', 'User-Agent': 'Python/3.7 aiohttp/3.8.1', 'X-Amzn-Trace-Id': 'Root=1-6231e920-42d270f01ad9475f4392ca31'}, 'json': None, 'origin': '218.82.172.215', 'url': 'http://www.httpbin.org/post'}
5. 超时设置
import aiohttp
import asyncio
async def main():
timeout = aiohttp.ClientTimeout(total=1)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get('https://www.httpbin.org/get') as response:
print('status:', response.status)
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
运行效果:
Traceback (most recent call last):
File "c:/Users/Apphao/Desktop/Python/cuiqingcai/6 异步爬虫/6.1 协程的基本原理/AsyncTest/21_demo20.py", line 11, in <module>
asyncio.get_event_loop().run_until_complete(main())
File "C:\Users\Apphao\anaconda3\lib\asyncio\base_events.py", line 583, in run_until_complete
return future.result()
File "c:/Users/Apphao/Desktop/Python/cuiqingcai/6 异步爬虫/6.1 协程的基本原理/AsyncTest/21_demo20.py", line 7, in main
async with session.get('https://www.httpbin.org/get') as response:
File "C:\Users\Apphao\anaconda3\lib\site-packages\aiohttp\client.py", line 1138, in __aenter__
self._resp = await self._coro
File "C:\Users\Apphao\anaconda3\lib\site-packages\aiohttp\client.py", line 634, in _request
break
File "C:\Users\Apphao\anaconda3\lib\site-packages\aiohttp\helpers.py", line 721, in __exit__
raise asyncio.TimeoutError from None
concurrent.futures._base.TimeoutError
6. 并发限制
import asyncio
import aiohttp
CONCURRENCY = 5 # 最大并发量为5
URL = 'https://www.baidu.com'
semaphore = asyncio.Semaphore(CONCURRENCY) # 设置信号量为5
session = None
async def scrape_api():
async with semaphore: # 利用信号量来控制并发数
print('scraping', URL)
async with session.get(URL) as response:
await asyncio.sleep(1)
return await response.text()
async def main():
global session
session = aiohttp.ClientSession()
scrape_index_tasks = [asyncio.ensure_future(scrape_api()) for _ in range(20)]
await asyncio.gather(*scrape_index_tasks)
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
三. aiohttp异步爬取实战
import asyncio
import aiohttp
import logging
import json
from motor.motor_asyncio import AsyncIOMotorClient
import re
import time
from numpy import *
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s: %(message)s')
INDEX_URL = 'https://spa5.scrape.center/api/book/?limit=18&offset={offset}' # 索引列表url
DETAIL_URL = 'https://spa5.scrape.center/api/book/{id}' # 详情信息url
PAGE_SIZE = 18 # 每一页的详情信息数量
PAGE_NUMBER = 100 # 页数
CONCURRENCY = 5 # 并发数5
MONGO_CONNECTION_STRING = 'mongodb://localhost:27017' # mongodb数据库地址
MONGO_DB_NAME = 'books' # 数据库名字
MONGO_COLLECTION_NAME = 'books2' # 档案名字
# 连接mongodb数据库
client = AsyncIOMotorClient(MONGO_CONNECTION_STRING)
db = client[MONGO_DB_NAME]
collection = db[MONGO_COLLECTION_NAME]
# 设置并发信号量
semaphore = asyncio.Semaphore(CONCURRENCY)
session = None
async def scrape_api(url): # 用aiohttp访问url并返回json数据
async with semaphore:
for c in range(10): # 重试机制10次
try:
logging.info('scraping %s', url)
async with session.get(url) as response:
return await response.json()
except aiohttp.ClientError:
logging.error('error occurred while scraping %s', url, exc_info=True)
# 获取列表页
async def scrape_index(page):
url = INDEX_URL.format(offset=PAGE_SIZE * (page - 1)) # 拼接索引页url
return await scrape_api(url) # 返回索引页url返回的json数据
# 字符串种多个空格变成一个空格
def spaceReplace(s):
return re.sub(' +', ' ', s)
# 解析详情信息
def parse_detail(data):
result = {}
result['id'] = data.get('id')
result['name'] = data.get('name')
result['authors'] = spaceReplace(' '.join(data.get('authors')).strip())
result['score'] = data.get('score')
result['cover'] = data.get('cover')
return result
# 获取详情信息
async def scrape_detail(id):
url = DETAIL_URL.format(id=id) # 拼接详情页url
data = await scrape_api(url) # 访问详情页
result = parse_detail(data) # 解析返回的json数据
await save_data(result) # 插入数据库
# 数据插入mongodb
async def save_data(data):
collection.update_one({
'id': data.get('id')
}, {
'$set': data
}, upsert=True)
logging.info('save data success: %s', data.get('name'))
# 主函数
async def main():
global session
session = aiohttp.ClientSession()
# 先用协程获取详情页url的id列表
scrape_index_tasks = [asyncio.ensure_future(scrape_index(page)) for page in range(1, PAGE_NUMBER + 1)]
results = await asyncio.gather(*scrape_index_tasks)
ids = []
for index_data in results:
if not index_data: continue
for item in index_data.get('results'):
ids.append(item.get('id'))
# 再用协程访问详情页url
scrape_detail_tasks = [asyncio.ensure_future(scrape_detail(id)) for id in ids]
await asyncio.wait(scrape_detail_tasks)
await session.close()
if __name__ == '__main__':
start = time.time()
asyncio.get_event_loop().run_until_complete(main())
end = time.time()
logging.info('Cost time is %s', end - start)