程序员scholar 程序员scholar
首页
  • Java 基础

    • JavaSE
    • JavaIO
    • JavaAPI速查
  • Java 高级

    • JUC
    • JVM
    • Java新特性
    • 设计模式
  • Web 开发

    • Servlet
    • Java网络编程
  • 数据结构
  • HTTP协议
  • HTTPS协议
  • 计算机网络
  • Linux常用命令
  • Windows常用命令
  • SQL数据库

    • MySQL
    • MySQL速查
  • NoSQL数据库

    • Redis
    • ElasticSearch
  • 数据库

    • MyBatis
    • MyBatis-Plus
  • 消息中间件

    • RabbitMQ
  • 服务器

    • Nginx
  • Python 基础

    • Python基础
  • Python 进阶

    • 装饰器与生成器
    • 异常处理
    • 标准库精讲
    • 模块与包
    • pip包管理工具
  • Spring框架

    • Spring6
    • SpringMVC
    • SpringBoot
    • SpringSecurity
  • SpringCould微服务

    • SpringCloud基础
    • 微服务之DDD架构思想
  • 日常必备

    • 开发常用工具包
    • Hutoll工具包
    • IDEA常用配置
    • 开发笔记
    • 日常记录
    • 项目部署
    • 网站导航
    • 产品学习
    • 英语学习
  • 代码管理

    • Maven
    • Git教程
    • Git小乌龟教程
  • 运维工具

    • Docker
    • Jenkins
    • Kubernetes
前端 (opens new window)
  • 算法笔记

    • 算法思想
    • 刷题笔记
  • 面试问题常见

    • 十大经典排序算法
    • 面试常见问题集锦
关于
GitHub (opens new window)
首页
  • Java 基础

    • JavaSE
    • JavaIO
    • JavaAPI速查
  • Java 高级

    • JUC
    • JVM
    • Java新特性
    • 设计模式
  • Web 开发

    • Servlet
    • Java网络编程
  • 数据结构
  • HTTP协议
  • HTTPS协议
  • 计算机网络
  • Linux常用命令
  • Windows常用命令
  • SQL数据库

    • MySQL
    • MySQL速查
  • NoSQL数据库

    • Redis
    • ElasticSearch
  • 数据库

    • MyBatis
    • MyBatis-Plus
  • 消息中间件

    • RabbitMQ
  • 服务器

    • Nginx
  • Python 基础

    • Python基础
  • Python 进阶

    • 装饰器与生成器
    • 异常处理
    • 标准库精讲
    • 模块与包
    • pip包管理工具
  • Spring框架

    • Spring6
    • SpringMVC
    • SpringBoot
    • SpringSecurity
  • SpringCould微服务

    • SpringCloud基础
    • 微服务之DDD架构思想
  • 日常必备

    • 开发常用工具包
    • Hutoll工具包
    • IDEA常用配置
    • 开发笔记
    • 日常记录
    • 项目部署
    • 网站导航
    • 产品学习
    • 英语学习
  • 代码管理

    • Maven
    • Git教程
    • Git小乌龟教程
  • 运维工具

    • Docker
    • Jenkins
    • Kubernetes
前端 (opens new window)
  • 算法笔记

    • 算法思想
    • 刷题笔记
  • 面试问题常见

    • 十大经典排序算法
    • 面试常见问题集锦
关于
GitHub (opens new window)
npm

(进入注册为作者充电)

  • Python 基础

  • Python 进阶

  • Python爬虫

  • Scrapy 爬虫框架

    • Scrapy 框架核心:深入理解其工作流程
    • Scrapy 入门实战:从零到一构建你的第一个爬虫
    • Scrapy核心对象:Response超详细指南
    • Scrapy 核心配置与调试技巧
    • Scrapy Items与Pipeline数据管道
      • 一、Item:定义你的数据结构
        • 1.1 为什么要用 Item?
        • 1.2 如何定义 Item
        • 1.3 ItemLoader:更优雅地填充 Item
      • 二、Item Pipeline:处理你的数据流水线
        • 2.1 Pipeline 的核心方法
        • 2.2 如何启用 Pipeline
      • 三、实战:构建一条完整的数据处理流水线
        • 3.1 管道一:数据验证与清洗 (Validation & Cleaning)
        • 3.2 管道二:重复数据过滤 (Duplicates Filter)
        • 3.3 管道三:异步存入 MySQL 数据库
      • 四、Item Exporter: 轻量级的数据导出
      • 五、总结
    • Scrapy 图片与文件下载
    • Scrapy 模拟登录与Cookie处理
    • Scrapy CrawlSpider全站爬取
    • Scrapy 中间件:请求与响应的强大控制器
    • Scrapy-Redis:从单机到分布式集群
  • Python
  • Scrapy 爬虫框架
scholar
2025-07-23
目录

Scrapy Items与Pipeline数据管道

# Scrapy Items 与 Pipeline 数据管道

当你用 Scrapy 从网页中成功提取出数据后,接下来需要解决两个核心问题:

  1. 如何规范地组织这些数据?
  2. 如何对这些数据进行清洗、验证和持久化存储?

Scrapy 为此提供了两个强大的组件:Items 和 Item Pipelines。它们共同构成了 Scrapy 的数据处理流水线,实现了关注点分离(Separation of Concerns):Spider 专注于从网页中“提取”数据,而 Pipeline 专注于对提取到的数据进行“处理”。

# 一、Item:定义你的数据结构

虽然在 Spider 中直接 yield 一个 Python 字典非常方便,但在中大型项目中,我们强烈推荐使用 Item 来定义你的数据结构。

# 1.1 为什么要用 Item?

  • 结构清晰:Item 提供了一个固定的数据结构,像一个预定义的模板,让你的团队成员或未来的你一眼就能看出这个爬虫到底抓取了哪些字段。
  • 防止错误:它可以防止因字段名拼写错误(例如将 title 误写为 titel)而导致的静默失败。如果给一个未在 Item 中定义的字段赋值,Scrapy 会抛出 KeyError,帮助你快速定位问题。
  • 组件兼容:Scrapy 的许多内置组件和第三方扩展(如去重、数据导出、数据库存储等)都对 Item 对象有更好的支持。
  • 易于维护:当数据结构需要变更时,你只需要修改 items.py 文件,而不用在爬虫代码的多个地方进行查找和替换。

# 1.2 如何定义 Item

在项目 items.py 文件中,通过继承 scrapy.Item 并使用 scrapy.Field() 来定义你的数据字段。

# myproject/items.py

import scrapy

class DuanziItem(scrapy.Item):
    # scrapy.Field() 只是一个占位符,类似于声明,本身不接收任何参数。
    # 它的作用是为你的 Item 定义合法的字段,所有字段都应使用 Field() 声明。
    # 你还可以为字段添加元数据(metadata),供其他组件使用。
    
    title = scrapy.Field(
        # 示例:为 'title' 字段添加元数据,比如描述信息
        serializer=str,
        description="段子的标题"
    )
    content = scrapy.Field(
        serializer=str,
        description="段子的正文内容"
    )
    author = scrapy.Field()
    url = scrapy.Field()
    crawl_time = scrapy.Field()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# 1.3 ItemLoader:更优雅地填充 Item

直接在 Spider 中通过 item['field'] = ... 的方式填充 Item 字段在逻辑复杂时会显得很乱。Scrapy 提供了一个更强大、更优雅的工具:ItemLoader。

ItemLoader 的主要优势在于将数据提取逻辑与数据填充逻辑分离,并通过**输入/输出处理器(Input/Output Processors)**对数据进行预处理和后处理。

如何使用 ItemLoader

# myproject/spiders/duanzi_spider.py
import scrapy
from myproject.items import DuanziItem
from scrapy.loader import ItemLoader
from itemloaders.processors import TakeFirst, MapCompose, Join
import datetime

class DuanziLoader(ItemLoader):
    # 为 ItemLoader 设置默认的输出处理器
    # TakeFirst() 会自动取列表中的第一个非空值,避免了到处写 .get()
    default_output_processor = TakeFirst()
    
    # 为 'content' 字段定义输入和输出处理器
    # MapCompose 会对输入列表中的每个元素依次应用函数(如去除首尾空格)
    # Join() 会将处理后的列表元素用指定的分隔符连接成一个字符串
    content_in = MapCompose(str.strip)
    content_out = Join('\n')

class DuanziSpider(scrapy.Spider):
    name = 'duanzi'
    start_urls = ['http://duanzixing.com/']

    def parse(self, response, **kwargs):
        article_list = response.xpath('//article[@class="excerpt"]')
        for article in article_list:
            # 1. 实例化 DuanziLoader
            # response 参数用于解析相对 URL
            loader = DuanziLoader(item=DuanziItem(), selector=article)
            
            # 2. 使用 add_xpath 或 add_css 添加数据,而不是 .get()
            loader.add_xpath('title', './header/h2/a/text()')
            loader.add_xpath('content', './p[@class="note"]/text()')
            loader.add_xpath('author', './/a[@class="author-name"]/text()', default='匿名')
            
            # 3. 使用 add_value 添加非提取的值
            loader.add_value('url', response.urljoin(article.xpath('./header/h2/a/@href').get()))
            loader.add_value('crawl_time', datetime.datetime.now())
            
            # 4. 调用 load_item() 生成填充好的 Item
            yield loader.load_item()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

# 二、Item Pipeline:处理你的数据流水线

当 Spider yield 一个 Item 对象后,这个 Item 会被发送到 Item Pipeline(数据管道)进行后续处理。一个项目可以定义多个 Pipeline,它们会像一条流水线一样,按顺序对 Item 进行加工。

Pipeline 的主要职责包括:

  • 数据清洗:清理HTML标签、修正格式、统一单位等。
  • 数据验证:检查必需字段是否存在、数据格式是否正确。
  • 去重:根据唯一标识符(如文章ID)过滤掉重复的 Item。
  • 持久化存储:将数据存入数据库、写入文件(CSV, JSON, XML)或发送到消息队列。

# 2.1 Pipeline 的核心方法

一个 Pipeline 是一个普通的 Python 类,但它有三个特殊的核心方法,Scrapy 会在特定时机自动调用它们:

  • open_spider(self, spider): 爬虫启动时调用,且仅调用一次。非常适合执行数据库连接、打开文件、加载配置等初始化操作。
  • process_item(self, item, spider): 每当 Spider yield 一个 Item 时,此方法被调用一次。这是 Pipeline 的核心,所有的数据处理逻辑都在这里编写。此方法必须 return item 或 raise DropItem()。如果返回 item,它将被传递给下一个(优先级更低的)Pipeline;如果 raise DropItem,该 item 的处理将在此终止。
  • close_spider(self, spider): 爬虫关闭时调用,且仅调用一次。适合执行数据库连接关闭、文件关闭、资源释放等收尾操作。

# 2.2 如何启用 Pipeline

默认情况下,Pipeline 是不工作的。你需要在 settings.py 文件中启用它,并为其分配一个 0-1000 的优先级。

# settings.py

# ITEM_PIPELINES 是一个字典
ITEM_PIPELINES = {
   # 键: 你的 Pipeline 类的完整路径
   # 值: 优先级(0-1000),数字越小,优先级越高,Item 会越先经过它
   'myproject.pipelines.DataValidationPipeline': 100,
   'myproject.pipelines.DuanziMysqlPipeline': 300,
   'myproject.pipelines.DuanziFilePipeline': 400,
}
1
2
3
4
5
6
7
8
9
10

# 三、实战:构建一条完整的数据处理流水线

下面我们通过三个例子,构建一条从验证、去重到存储的完整流水线。

# 3.1 管道一:数据验证与清洗 (Validation & Cleaning)

这个管道优先级最高,负责确保数据的基本质量。

# myproject/pipelines.py
from scrapy.exceptions import DropItem
import re

class DataValidationPipeline:
    def process_item(self, item, spider):
        # 检查 'title' 字段是否存在且不为空
        if not item.get('title'):
            raise DropItem(f"Missing title in {item}")
        
        # 清洗 content 字段:移除不必要的空白字符
        if item.get('content'):
            item['content'] = re.sub(r'\s+', ' ', item['content']).strip()
        
        # 如果爬虫是 duanzi_spider,则执行特定验证
        if spider.name == 'duanzi':
            if len(item.get('content', '')) < 10:
                raise DropItem(f"Content too short in {item}")
        
        return item
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# 3.2 管道二:重复数据过滤 (Duplicates Filter)

这个管道负责根据唯一标识(如URL)来过滤掉已经抓取过的 Item。

# myproject/pipelines.py
from scrapy.exceptions import DropItem

class DuplicatesPipeline:
    def __init__(self):
        # 用于存储已见过的 item 的 URL
        self.urls_seen = set()

    def process_item(self, item, spider):
        # 使用 item 的 'url' 字段作为唯一标识
        if item['url'] in self.urls_seen:
            raise DropItem(f"Duplicate item found: {item['url']}")
        else:
            self.urls_seen.add(item['url'])
            return item
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 3.3 管道三:异步存入 MySQL 数据库

对于I/O密集型操作(如数据库写入),使用同步代码可能会阻塞 Scrapy 的事件循环,影响性能。Scrapy 基于 Twisted,天然支持异步操作。我们可以使用 twisted.enterprise.adbapi 来实现数据库连接池和异步写入。

首先,安装 pymysql:pip install pymysql

# myproject/pipelines.py
import pymysql
from twisted.enterprise import adbapi

class AsyncMysqlPipeline:
    def __init__(self, db_settings):
        # 创建数据库连接池
        self.db_pool = adbapi.ConnectionPool(
            'pymysql',  # 数据库驱动名
            **db_settings
        )
        print(">>>>>> 异步 MySQL 管道已启动 <<<<<<")

    @classmethod
    def from_crawler(cls, crawler):
        # 从 settings.py 中读取数据库配置
        db_settings = dict(
            host=crawler.settings.get('MYSQL_HOST', '127.0.0.1'),
            user=crawler.settings.get('MYSQL_USER', 'root'),
            password=crawler.settings.get('MYSQL_PASSWORD', ''),
            db=crawler.settings.get('MYSQL_DB', 'scrapy_db'),
            charset='utf8mb4',
            cursorclass=pymysql.cursors.DictCursor, # 使用字典游标
        )
        return cls(db_settings)

    def process_item(self, item, spider):
        # runInteraction 会将实际的数据库操作放入线程池中异步执行
        query = self.db_pool.runInteraction(self.do_insert, item)
        # 添加错误处理回调
        query.addErrback(self.handle_error, item, spider)
        return item

    def do_insert(self, cursor, item):
        # 执行具体的插入操作
        # 使用 INSERT IGNORE 来避免因主键或唯一索引冲突而报错
        sql = "INSERT IGNORE INTO duanzi (title, content, author, url, crawl_time) VALUES (%s, %s, %s, %s, %s)"
        cursor.execute(sql, (
            item.get('title'),
            item.get('content'),
            item.get('author'),
            item.get('url'),
            item.get('crawl_time')
        ))

    def handle_error(self, failure, item, spider):
        # 处理异步插入时可能出现的错误
        spider.logger.error(f"Failed to insert item into MySQL: {failure}")

    def close_spider(self, spider):
        # 关闭数据库连接池
        self.db_pool.close()
        print(">>>>>> 异步 MySQL 管道已关闭 <<<<<<")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

你需要预先在 MySQL 中创建好数据库和表:

CREATE DATABASE scrapy_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
USE scrapy_db;
CREATE TABLE duanzi (
    id INT AUTO_INCREMENT PRIMARY KEY,
    title VARCHAR(255) NOT NULL,
    content TEXT,
    author VARCHAR(100),
    url VARCHAR(512) UNIQUE, -- 将 url 设为唯一索引,用于去重
    crawl_time DATETIME,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
1
2
3
4
5
6
7
8
9
10
11

# 四、Item Exporter: 轻量级的数据导出

如果你只是想简单地将数据保存为 JSON, CSV, 或 XML 格式的文件,而不想编写完整的 Pipeline,Scrapy 提供了 Item Exporter。

最简单的方式是通过命令行参数:

# 导出为 JSON Lines 格式(推荐)
scrapy crawl duanzi -o duanzi.jsonl

# 导出为 CSV 格式
scrapy crawl duanzi -o duanzi.csv

# 导出为 XML 格式
scrapy crawl duanzi -o duanzi.xml
1
2
3
4
5
6
7
8

Scrapy 会自动为你处理文件的创建、写入和关闭。这对于快速原型开发或简单的数据采集任务非常有用。

# 五、总结

  1. 始终使用 Item:即使是小项目,也建议使用 Item 来定义数据结构,这是一种良好的编程习惯。
  2. 拥抱 ItemLoader:使用 ItemLoader 和处理器来分离提取与填充逻辑,使 Spider 代码更干净、更易于维护。
  3. Pipeline 链式处理:将不同的处理逻辑拆分到不同的 Pipeline 中,每个 Pipeline 只做一件事(单一职责原则)。利用优先级来控制它们的执行顺序。
  4. 异步优先:对于数据库写入等 I/O 操作,优先考虑使用异步 Pipeline,以避免阻塞 Scrapy 的核心引擎,从而获得更好的爬取性能。
  5. 区分 Spider:在 Pipeline 中使用 if spider.name == '...' 来为不同的爬虫应用不同的数据处理逻辑。
  6. 轻量级任务用 Exporter:对于简单的数据导出需求,直接使用 scrapy crawl ... -o ... 命令,无需编写任何 Pipeline 代码。
编辑此页 (opens new window)
上次更新: 2025/07/27, 04:30:11
Scrapy 核心配置与调试技巧
Scrapy 图片与文件下载

← Scrapy 核心配置与调试技巧 Scrapy 图片与文件下载→

Theme by Vdoing | Copyright © 2019-2025 程序员scholar
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式