Scrapy Items与Pipeline数据管道
# Scrapy Items 与 Pipeline 数据管道
当你用 Scrapy 从网页中成功提取出数据后,接下来需要解决两个核心问题:
- 如何规范地组织这些数据?
- 如何对这些数据进行清洗、验证和持久化存储?
Scrapy 为此提供了两个强大的组件:Items
和 Item Pipelines
。它们共同构成了 Scrapy 的数据处理流水线,实现了关注点分离(Separation of Concerns):Spider 专注于从网页中“提取”数据,而 Pipeline 专注于对提取到的数据进行“处理”。
# 一、Item:定义你的数据结构
虽然在 Spider 中直接 yield
一个 Python 字典非常方便,但在中大型项目中,我们强烈推荐使用 Item
来定义你的数据结构。
# 1.1 为什么要用 Item?
- 结构清晰:
Item
提供了一个固定的数据结构,像一个预定义的模板,让你的团队成员或未来的你一眼就能看出这个爬虫到底抓取了哪些字段。 - 防止错误:它可以防止因字段名拼写错误(例如将
title
误写为titel
)而导致的静默失败。如果给一个未在Item
中定义的字段赋值,Scrapy 会抛出KeyError
,帮助你快速定位问题。 - 组件兼容:Scrapy 的许多内置组件和第三方扩展(如去重、数据导出、数据库存储等)都对
Item
对象有更好的支持。 - 易于维护:当数据结构需要变更时,你只需要修改
items.py
文件,而不用在爬虫代码的多个地方进行查找和替换。
# 1.2 如何定义 Item
在项目 items.py
文件中,通过继承 scrapy.Item
并使用 scrapy.Field()
来定义你的数据字段。
# myproject/items.py
import scrapy
class DuanziItem(scrapy.Item):
# scrapy.Field() 只是一个占位符,类似于声明,本身不接收任何参数。
# 它的作用是为你的 Item 定义合法的字段,所有字段都应使用 Field() 声明。
# 你还可以为字段添加元数据(metadata),供其他组件使用。
title = scrapy.Field(
# 示例:为 'title' 字段添加元数据,比如描述信息
serializer=str,
description="段子的标题"
)
content = scrapy.Field(
serializer=str,
description="段子的正文内容"
)
author = scrapy.Field()
url = scrapy.Field()
crawl_time = scrapy.Field()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 1.3 ItemLoader:更优雅地填充 Item
直接在 Spider 中通过 item['field'] = ...
的方式填充 Item
字段在逻辑复杂时会显得很乱。Scrapy 提供了一个更强大、更优雅的工具:ItemLoader
。
ItemLoader
的主要优势在于将数据提取逻辑与数据填充逻辑分离,并通过**输入/输出处理器(Input/Output Processors)**对数据进行预处理和后处理。
如何使用 ItemLoader
# myproject/spiders/duanzi_spider.py
import scrapy
from myproject.items import DuanziItem
from scrapy.loader import ItemLoader
from itemloaders.processors import TakeFirst, MapCompose, Join
import datetime
class DuanziLoader(ItemLoader):
# 为 ItemLoader 设置默认的输出处理器
# TakeFirst() 会自动取列表中的第一个非空值,避免了到处写 .get()
default_output_processor = TakeFirst()
# 为 'content' 字段定义输入和输出处理器
# MapCompose 会对输入列表中的每个元素依次应用函数(如去除首尾空格)
# Join() 会将处理后的列表元素用指定的分隔符连接成一个字符串
content_in = MapCompose(str.strip)
content_out = Join('\n')
class DuanziSpider(scrapy.Spider):
name = 'duanzi'
start_urls = ['http://duanzixing.com/']
def parse(self, response, **kwargs):
article_list = response.xpath('//article[@class="excerpt"]')
for article in article_list:
# 1. 实例化 DuanziLoader
# response 参数用于解析相对 URL
loader = DuanziLoader(item=DuanziItem(), selector=article)
# 2. 使用 add_xpath 或 add_css 添加数据,而不是 .get()
loader.add_xpath('title', './header/h2/a/text()')
loader.add_xpath('content', './p[@class="note"]/text()')
loader.add_xpath('author', './/a[@class="author-name"]/text()', default='匿名')
# 3. 使用 add_value 添加非提取的值
loader.add_value('url', response.urljoin(article.xpath('./header/h2/a/@href').get()))
loader.add_value('crawl_time', datetime.datetime.now())
# 4. 调用 load_item() 生成填充好的 Item
yield loader.load_item()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 二、Item Pipeline:处理你的数据流水线
当 Spider yield
一个 Item
对象后,这个 Item
会被发送到 Item Pipeline
(数据管道)进行后续处理。一个项目可以定义多个 Pipeline,它们会像一条流水线一样,按顺序对 Item
进行加工。
Pipeline
的主要职责包括:
- 数据清洗:清理HTML标签、修正格式、统一单位等。
- 数据验证:检查必需字段是否存在、数据格式是否正确。
- 去重:根据唯一标识符(如文章ID)过滤掉重复的
Item
。 - 持久化存储:将数据存入数据库、写入文件(CSV, JSON, XML)或发送到消息队列。
# 2.1 Pipeline 的核心方法
一个 Pipeline
是一个普通的 Python 类,但它有三个特殊的核心方法,Scrapy 会在特定时机自动调用它们:
open_spider(self, spider)
: 爬虫启动时调用,且仅调用一次。非常适合执行数据库连接、打开文件、加载配置等初始化操作。process_item(self, item, spider)
: 每当 Spideryield
一个Item
时,此方法被调用一次。这是Pipeline
的核心,所有的数据处理逻辑都在这里编写。此方法必须return item
或raise DropItem()
。如果返回item
,它将被传递给下一个(优先级更低的)Pipeline;如果raise DropItem
,该item
的处理将在此终止。close_spider(self, spider)
: 爬虫关闭时调用,且仅调用一次。适合执行数据库连接关闭、文件关闭、资源释放等收尾操作。
# 2.2 如何启用 Pipeline
默认情况下,Pipeline
是不工作的。你需要在 settings.py
文件中启用它,并为其分配一个 0-1000 的优先级。
# settings.py
# ITEM_PIPELINES 是一个字典
ITEM_PIPELINES = {
# 键: 你的 Pipeline 类的完整路径
# 值: 优先级(0-1000),数字越小,优先级越高,Item 会越先经过它
'myproject.pipelines.DataValidationPipeline': 100,
'myproject.pipelines.DuanziMysqlPipeline': 300,
'myproject.pipelines.DuanziFilePipeline': 400,
}
2
3
4
5
6
7
8
9
10
# 三、实战:构建一条完整的数据处理流水线
下面我们通过三个例子,构建一条从验证、去重到存储的完整流水线。
# 3.1 管道一:数据验证与清洗 (Validation & Cleaning)
这个管道优先级最高,负责确保数据的基本质量。
# myproject/pipelines.py
from scrapy.exceptions import DropItem
import re
class DataValidationPipeline:
def process_item(self, item, spider):
# 检查 'title' 字段是否存在且不为空
if not item.get('title'):
raise DropItem(f"Missing title in {item}")
# 清洗 content 字段:移除不必要的空白字符
if item.get('content'):
item['content'] = re.sub(r'\s+', ' ', item['content']).strip()
# 如果爬虫是 duanzi_spider,则执行特定验证
if spider.name == 'duanzi':
if len(item.get('content', '')) < 10:
raise DropItem(f"Content too short in {item}")
return item
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 3.2 管道二:重复数据过滤 (Duplicates Filter)
这个管道负责根据唯一标识(如URL)来过滤掉已经抓取过的 Item
。
# myproject/pipelines.py
from scrapy.exceptions import DropItem
class DuplicatesPipeline:
def __init__(self):
# 用于存储已见过的 item 的 URL
self.urls_seen = set()
def process_item(self, item, spider):
# 使用 item 的 'url' 字段作为唯一标识
if item['url'] in self.urls_seen:
raise DropItem(f"Duplicate item found: {item['url']}")
else:
self.urls_seen.add(item['url'])
return item
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 3.3 管道三:异步存入 MySQL 数据库
对于I/O密集型操作(如数据库写入),使用同步代码可能会阻塞 Scrapy 的事件循环,影响性能。Scrapy 基于 Twisted,天然支持异步操作。我们可以使用 twisted.enterprise.adbapi
来实现数据库连接池和异步写入。
首先,安装 pymysql
:pip install pymysql
# myproject/pipelines.py
import pymysql
from twisted.enterprise import adbapi
class AsyncMysqlPipeline:
def __init__(self, db_settings):
# 创建数据库连接池
self.db_pool = adbapi.ConnectionPool(
'pymysql', # 数据库驱动名
**db_settings
)
print(">>>>>> 异步 MySQL 管道已启动 <<<<<<")
@classmethod
def from_crawler(cls, crawler):
# 从 settings.py 中读取数据库配置
db_settings = dict(
host=crawler.settings.get('MYSQL_HOST', '127.0.0.1'),
user=crawler.settings.get('MYSQL_USER', 'root'),
password=crawler.settings.get('MYSQL_PASSWORD', ''),
db=crawler.settings.get('MYSQL_DB', 'scrapy_db'),
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor, # 使用字典游标
)
return cls(db_settings)
def process_item(self, item, spider):
# runInteraction 会将实际的数据库操作放入线程池中异步执行
query = self.db_pool.runInteraction(self.do_insert, item)
# 添加错误处理回调
query.addErrback(self.handle_error, item, spider)
return item
def do_insert(self, cursor, item):
# 执行具体的插入操作
# 使用 INSERT IGNORE 来避免因主键或唯一索引冲突而报错
sql = "INSERT IGNORE INTO duanzi (title, content, author, url, crawl_time) VALUES (%s, %s, %s, %s, %s)"
cursor.execute(sql, (
item.get('title'),
item.get('content'),
item.get('author'),
item.get('url'),
item.get('crawl_time')
))
def handle_error(self, failure, item, spider):
# 处理异步插入时可能出现的错误
spider.logger.error(f"Failed to insert item into MySQL: {failure}")
def close_spider(self, spider):
# 关闭数据库连接池
self.db_pool.close()
print(">>>>>> 异步 MySQL 管道已关闭 <<<<<<")
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
你需要预先在 MySQL 中创建好数据库和表:
CREATE DATABASE scrapy_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
USE scrapy_db;
CREATE TABLE duanzi (
id INT AUTO_INCREMENT PRIMARY KEY,
title VARCHAR(255) NOT NULL,
content TEXT,
author VARCHAR(100),
url VARCHAR(512) UNIQUE, -- 将 url 设为唯一索引,用于去重
crawl_time DATETIME,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
2
3
4
5
6
7
8
9
10
11
# 四、Item Exporter: 轻量级的数据导出
如果你只是想简单地将数据保存为 JSON, CSV, 或 XML 格式的文件,而不想编写完整的 Pipeline,Scrapy 提供了 Item Exporter
。
最简单的方式是通过命令行参数:
# 导出为 JSON Lines 格式(推荐)
scrapy crawl duanzi -o duanzi.jsonl
# 导出为 CSV 格式
scrapy crawl duanzi -o duanzi.csv
# 导出为 XML 格式
scrapy crawl duanzi -o duanzi.xml
2
3
4
5
6
7
8
Scrapy 会自动为你处理文件的创建、写入和关闭。这对于快速原型开发或简单的数据采集任务非常有用。
# 五、总结
- 始终使用
Item
:即使是小项目,也建议使用Item
来定义数据结构,这是一种良好的编程习惯。 - 拥抱
ItemLoader
:使用ItemLoader
和处理器来分离提取与填充逻辑,使 Spider 代码更干净、更易于维护。 - Pipeline 链式处理:将不同的处理逻辑拆分到不同的
Pipeline
中,每个Pipeline
只做一件事(单一职责原则)。利用优先级来控制它们的执行顺序。 - 异步优先:对于数据库写入等 I/O 操作,优先考虑使用异步
Pipeline
,以避免阻塞 Scrapy 的核心引擎,从而获得更好的爬取性能。 - 区分 Spider:在
Pipeline
中使用if spider.name == '...'
来为不同的爬虫应用不同的数据处理逻辑。 - 轻量级任务用 Exporter:对于简单的数据导出需求,直接使用
scrapy crawl ... -o ...
命令,无需编写任何Pipeline
代码。