杰瑞科技汇

Tornado爬虫,异步高并发如何实现?

Tornado 是一个强大的 Web 框架,以其异步非阻塞 I/O模型而闻名,这个特性使其非常适合构建爬虫,因为它可以高效地处理大量网络请求,而不会像传统的同步爬虫(如 Requests + time.sleep())那样被 I/O 等待所阻塞,从而大大提高爬取效率。

Tornado爬虫,异步高并发如何实现?-图1
(图片来源网络,侵删)

为什么选择 Tornado 做爬虫?

  1. 高性能与高并发:Tornado 的核心是 IOLoop,它使用非阻塞网络 I/O,当你发起一个 HTTP 请求时,Tornado 不会等待响应,而是立即去处理其他任务,当响应到达时,IOLoop 会再回来处理它,这意味着一个线程可以同时处理成千上万个网络连接,这对于爬虫来说是巨大的优势。
  2. 异步编程模型:Tornado 的异步操作通过回调函数或现代的 async/await 语法(Python 3.5+)来实现,可以避免复杂的线程池管理。
  3. 内置 HTTP 客户端:Tornado 自带了一个功能完善的异步 HTTP 客户端 tornado.httpclient.AsyncHTTPClient,开箱即用,无需额外安装 requests 等同步库。
  4. 协程支持:结合 async/await,Tornado 的异步代码可以写得像同步代码一样清晰易读,这是现代异步编程的最佳实践。

核心概念:异步爬虫的工作流程

一个同步爬虫的工作流程是: 发起请求 -> 等待响应 -> 解析数据 -> 发起下一个请求

这个过程是串行的,大部分时间都花在了“等待响应”上。

一个 Tornado 异步爬虫的工作流程是:

  1. 发起请求:使用 AsyncHTTPClient 发起一个异步请求,并指定一个回调函数
  2. 立即返回:请求发出后,函数立即返回,IOLoop 继续执行其他任务(比如发起其他请求)。
  3. 响应到达:当服务器返回响应后,IOLoop 会调用之前指定的回调函数,并将响应对象作为参数传入。
  4. 处理响应:在回调函数中,解析响应数据,然后根据需要发起下一个新的异步请求。

这个流程可以同时进行成百上千个请求,极大地提高了效率。

Tornado爬虫,异步高并发如何实现?-图2
(图片来源网络,侵删)

实战示例:爬取豆瓣电影 Top 250

我们将分步构建一个爬虫,来爬取豆瓣电影 Top 250 的电影名称、评分和一句话评价。

步骤 1:环境准备

确保你已经安装了 Tornado,如果没有,请安装:

pip install tornado

步骤 2:编写一个基础的异步爬虫

这个示例将展示最核心的异步请求和回调逻辑。

# spider_basic.py
import tornado.httpclient
import tornado.ioloop
import tornado.web
import json
from urllib.parse import quote
# 定义一个回调函数来处理 HTTP 响应
def handle_response(response):
    """
    这个函数会在 HTTP 响应返回后被调用
    """
    if response.error:
        print(f"请求失败: {response.error}")
    else:
        # 解析 HTML (这里简单打印,实际应使用BeautifulSoup等)
        print(f"成功获取到页面,内容长度: {len(response.body)}")
        # 你可以添加解析逻辑,并决定是否继续爬取下一页
# 定义一个爬虫类,继承自tornado.web.RequestHandler
# 这样我们可以通过 HTTP 请求来启动爬虫,方便管理和控制
class SpiderHandler(tornado.web.RequestHandler):
    async def get(self):
        # 创建一个异步 HTTP 客户端
        http_client = tornado.httpclient.AsyncHTTPClient()
        # 豆瓣电影 Top 250 的 URL
        # 注意:豆瓣有反爬机制,直接爬取可能会被 ban,这里仅作演示
        url = "https://movie.douban.com/top250"
        try:
            # 发起异步 GET 请求
            # response = await http_client.fetch(url)
            # print(response.body) # 如果使用 await,就不需要回调函数了
            # 使用回调函数的方式 (更传统的 Tornado 风格)
            # fetch 的第二个参数就是回调函数
            http_client.fetch(url, callback=handle_response)
            self.write("爬虫任务已启动!请查看服务器控制台输出。")
        except Exception as e:
            self.write(f"启动爬虫时出错: {e}")
# 设置 Tornado 应用
def make_app():
    return tornado.web.Application([
        (r"/start", SpiderHandler),
    ])
if __name__ == "__main__":
    app = make_app()
    app.listen(8888)
    print("服务器启动在 http://localhost:8888")
    print("访问 http://localhost:8888/start 来启动爬虫")
    # 启动 IOLoop
    tornado.ioloop.IOLoop.current().start()

如何运行和测试:

  1. 保存代码为 spider_basic.py
  2. 在终端运行:python spider_basic.py
  3. 打开浏览器访问 http://localhost:8888/start
  4. 你会看到浏览器显示 "爬虫任务已启动!请查看服务器控制台输出。"
  5. 服务器控制台会打印出 "成功获取到页面,内容长度: xxxxx"。

代码解析:

  • tornado.httpclient.AsyncHTTPClient: 这是 Tornado 的异步 HTTP 客户端。
  • http_client.fetch(url, callback=handle_response): 这是核心。fetch 方法不会阻塞,它会立即将请求交给 IOLoop 处理,然后继续执行 get 方法的后续代码,当网络响应返回时,IOLoop 会调用 handle_response 函数。
  • handle_response(response): 回调函数,它接收一个 HTTPResponse 对象作为参数,你可以从中获取响应码、响应头、响应体等。

进阶示例:使用 async/awaitBeautifulSoup

现代 Python 推荐使用 async/await 语法,它能让异步代码更易于理解,我们使用 BeautifulSoup 来解析 HTML,并实现一个简单的翻页逻辑。

安装依赖

pip install beautifulsoup4

代码实现 (spider_advanced.py)

# spider_advanced.py
import tornado.httpclient
import tornado.ioloop
import tornado.web
from bs4 import BeautifulSoup
import asyncio
import json
from urllib.parse import quote
class DoubanTop250Spider:
    def __init__(self):
        self.base_url = "https://movie.douban.com/top250"
        self.client = tornado.httpclient.AsyncHTTPClient()
        self.movies_data = []
        self.current_page = 0
        self.total_pages = 10 # 豆瓣 Top 250 共10页
    async def fetch_page(self, page):
        """获取单个页面的内容"""
        start = page * 25
        url = f"{self.base_url}?start={start}"
        print(f"正在爬取第 {page + 1} 页: {url}")
        try:
            # 使用 await 来等待异步操作完成
            response = await self.client.fetch(url)
            return response.body
        except Exception as e:
            print(f"爬取第 {page + 1} 页失败: {e}")
            return None
    def parse_page(self, html_content):
        """解析页面内容,提取电影信息"""
        soup = BeautifulSoup(html_content, 'html.parser')
        movie_list = soup.find_all('div', class_='item')
        for movie in movie_list:
            title = movie.find('span', class_='title').text
            rating = movie.find('span', class_='rating_num').text
            quote_info = movie.find('span', class_='inq')
            quote = quote_info.text if quote_info else "无"
            self.movies_data.append({
                "title": title,
                "rating": rating,
                "quote": quote
            })
            print(f"已爬取: {title}, 评分: {rating}")
    async def run(self):
        """运行爬虫的主逻辑"""
        print("豆瓣 Top 250 爬虫开始运行...")
        for page in range(self.total_pages):
            html = await self.fetch_page(page)
            if html:
                self.parse_page(html)
            # 添加一个小的延迟,避免请求过于频繁被封禁
            # tornado.gen.sleep 是 Tornado 提供的异步 sleep
            await tornado.gen.sleep(1) 
        # 所有页面爬取完毕,保存结果
        self.save_to_json()
    def save_to_json(self):
        """将爬取的数据保存到 JSON 文件"""
        with open('douban_top250.json', 'w', encoding='utf-8') as f:
            json.dump(self.movies_data, f, ensure_ascii=False, indent=4)
        print("\n爬虫运行结束!")
        print(f"共爬取 {len(self.movies_data)} 部电影,数据已保存到 douban_top250.json")
# 同样,我们通过一个 Web 接口来启动爬虫
class StartSpiderHandler(tornado.web.RequestHandler):
    async def get(self):
        spider = DoubanTop250Spider()
        # 使用 tornado.ioloop.IOLoop.current().spawn_callback 来在后台运行协程
        # 这样不会阻塞 Web 服务器响应客户端请求
        tornado.ioloop.IOLoop.current().spawn_callback(spider.run)
        self.write("爬虫已在后台启动!请耐心等待,完成后数据将保存到 douban_top250.json 文件中。")
def make_app():
    return tornado.web.Application([
        (r"/start", StartSpiderHandler),
    ])
if __name__ == "__main__":
    app = make_app()
    app.listen(8888)
    print("服务器启动在 http://localhost:8888")
    print("访问 http://localhost:8888/start 来启动爬虫")
    tornado.ioloop.IOLoop.current().start()

代码解析:

  1. async defawaitfetch_pagerun 方法都使用了 async def 定义,成为协程,在 run 中,我们用 await 来等待 fetch_page 的结果,代码看起来是线性的,但执行过程是异步的。
  2. tornado.gen.sleep:这是一个异步的 sleep 函数,在 await tornado.gen.sleep(1) 时,IOLoop 会暂停当前协程1秒,然后去处理其他任务,而不是像 time.sleep(1) 那样阻塞整个线程。
  3. IOLoop.spawn_callback:这个方法非常重要,它允许我们在不阻塞主线程(Web 服务线程)的情况下,在后台启动一个新的协程,这样,即使爬虫需要运行很长时间,我们的 Web 服务器仍然可以正常响应其他用户的请求。
  4. BeautifulSoup:用于解析 HTML,比正则表达式更健壮、更方便。
  5. 数据保存:爬取完成后,我们将数据列表转换为 JSON 格式并保存到文件中。

重要注意事项与最佳实践

  1. 尊重 robots.txt:在爬取任何网站之前,请务必检查其 robots.txt 文件(https://www.douban.com/robots.txt),遵守网站的爬取规则。

  2. 设置 User-Agent:很多网站会拒绝没有 User-Agent 或使用默认 UA 的请求,在请求头中设置一个常见的浏览器 UA。

    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
    }
    response = await self.client.fetch(url, headers=headers)
  3. 处理反爬机制

    • IP 封禁:使用代理 IP 池,Tornado 的 AsyncHTTPClient 可以通过 AsyncHTTPClient.configure 来配置代理。
    • 频率限制:通过 tornado.gen.sleep() 控制请求间隔,模拟人类行为。
    • 验证码:如果遇到验证码,需要更复杂的处理,可能需要使用 Selenium 或第三方打码平台。
  4. 错误处理:网络请求是不可靠的,务必使用 try...except 来捕获可能发生的异常(如连接超时、DNS解析失败等)。

  5. 数据存储:对于大量数据,不要只存在内存中,可以定期将数据写入文件(如 JSON, CSV)或数据库(如 MongoDB, MySQL)。

  6. 并发控制:虽然 Tornado 可以处理高并发,但过于激进的并发可能会对目标服务器造成压力,也可能被封禁,可以通过信号量(asyncio.Semaphore)来限制同时进行的请求数量。

    semaphore = asyncio.Semaphore(10) # 限制最多10个并发
    async def fetch_page(self, page):
        async with semaphore:
            # ... 发起请求的代码 ...

Tornado 是一个构建高性能异步爬虫的绝佳选择,它的核心优势在于非阻塞 I/O,能够让你用少量的线程处理大量的网络请求,结合 async/await 语法和 BeautifulSoup 等工具,你可以编写出既高效又易于维护的爬虫程序,在编写爬虫时,务必遵守法律法规和网站的爬取规则,做一个有道德的爬虫开发者。

分享:
扫描分享到社交APP
上一篇
下一篇