Scrapy CrawlSpider全站爬取
# Scrapy CrawlSpider 全站爬取
当你需要爬取一个网站的多个页面,例如一个博客的所有文章、一个新闻网站的所有新闻时,如果还在 parse
方法中手动寻找"下一页"的链接并构造 Request
,代码会变得非常繁琐。为了应对这种"整站爬取"或"深度爬取"的场景,Scrapy 提供了一个更强大的爬虫类:CrawlSpider
。
CrawlSpider
的核心思想是定义规则 (Defining Rules),让爬虫能够自动地根据你设定的规则去发现、跟踪、并爬取页面链接,从而将开发者从重复的链接提取和请求构造工作中解放出来。
# 一、CrawlSpider 工作原理深度解析
# 1.1 CrawlSpider 与 Spider 的核心区别
特性 | scrapy.Spider (基础爬虫) | scrapy.spiders.CrawlSpider (爬行爬虫) |
---|---|---|
继承基类 | scrapy.Spider | scrapy.spiders.CrawlSpider |
链接发现 | 手动。需要在 parse 方法中自己写 XPath/CSS 来找链接,并手动 yield scrapy.Request 。 | 自动。通过定义一个 rules 列表,爬虫会自动根据规则发现并请求链接。 |
核心逻辑 | 写在 parse 等回调函数中。 | 定义在 rules 属性中。 |
代码主战场 | parse 方法。 | rules 列表和 Rule 中指定的回调函数(如 parse_item )。 |
parse 方法 | 用户可以任意重写,用于数据提取。 | 禁止重写。parse 方法在 CrawlSpider 内部有特殊的链接处理逻辑,一旦重写,rules 将会失效。 |
适用场景 | 简单爬取、复杂逻辑控制、动态链接生成 | 结构化网站、规律性链接、大规模全站爬取 |
# 1.2 CrawlSpider 内部工作流程
graph TD
A[start_urls] --> B[发送初始请求]
B --> C[调用parse方法]
C --> D[应用所有Rule规则]
D --> E[LinkExtractor提取链接]
E --> F{链接匹配规则?}
F -->|是| G[创建新请求]
F -->|否| H[忽略链接]
G --> I{有callback?}
I -->|是| J[调用callback处理数据]
I -->|否| K[仅跟踪链接]
J --> L{follow=True?}
K --> L
L -->|是| D
L -->|否| M[结束该分支]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
详细步骤说明:
- 初始化阶段:CrawlSpider 启动时,会对
start_urls
中的每个URL发送请求 - 规则应用:每个响应都会被所有
rules
中的规则依次处理 - 链接提取:
LinkExtractor
根据配置从响应中提取符合条件的链接 - 链接过滤:提取的链接会经过
allow
、deny
等规则过滤 - 请求生成:符合条件的链接被转换为新的
Request
对象 - 回调执行:如果规则指定了
callback
,则调用相应的回调函数处理数据 - 深度跟踪:如果
follow=True
,新请求的响应会再次应用所有规则
# 1.3 CrawlSpider 的优势与局限
优势:
- 自动化程度高:无需手动编写链接发现和跟踪逻辑
- 代码简洁:通过规则配置实现复杂的爬取逻辑
- 扩展性好:支持多种链接提取和过滤方式
- 性能优化:内置去重和调度优化
局限性:
- 灵活性有限:不适合需要复杂逻辑判断的场景
- 调试困难:规则配置错误时不易定位问题
- 学习成本:需要理解规则系统和参数配置
# 二、CrawlSpider 核心组件
# 2.1 rules
属性
rules
是一个包含一个或多个 Rule
对象的元组(或列表)。爬虫启动后,会根据 rules
中定义的每一条规则去匹配响应中的链接。
规则执行顺序:
- 规则按照在
rules
中定义的顺序依次执行 - 每个规则都会独立处理响应,不会互相干扰
- 同一个链接可能被多个规则匹配和处理
# 规则定义示例
rules = (
# 第一个规则:处理分页链接
Rule(LinkExtractor(allow=r'/page/\d+/'), follow=True),
# 第二个规则:处理文章链接
Rule(LinkExtractor(allow=r'/article/\d+\.html'), callback='parse_article'),
# 第三个规则:处理分类链接
Rule(LinkExtractor(allow=r'/category/\w+/'), follow=True),
)
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
# 2.2 LinkExtractor
(链接提取器) 详解
LinkExtractor
是链接发现的核心组件,它提供了丰富的参数来精确控制链接提取行为。
# 基础参数
from scrapy.linkextractors import LinkExtractor
# 基础用法
link_extractor = LinkExtractor(
allow=r'/article/\d+\.html', # 允许的URL正则表达式
deny=r'/admin/', # 禁止的URL正则表达式
allow_domains=['example.com'], # 允许的域名
deny_domains=['spam.com'], # 禁止的域名
)
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
# 高级参数
1. 正则表达式过滤
# 复杂正则表达式示例
LinkExtractor(
allow=[
r'/news/\d{4}/\d{2}/\d{2}/', # 新闻日期格式:/news/2023/12/25/
r'/article/\w+\.html', # 文章页面
r'/category/[\w-]+/$' # 分类页面
],
deny=[
r'/admin/', # 管理页面
r'/login', # 登录页面
r'\.pdf$', # PDF文件
r'/api/', # API接口
]
)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
2. 范围限制参数
# XPath范围限制
LinkExtractor(
allow=r'/article/',
restrict_xpaths=[
'//div[@class="content-area"]', # 只在内容区域查找链接
'//nav[@class="pagination"]' # 只在分页导航中查找链接
]
)
# CSS选择器范围限制
LinkExtractor(
allow=r'/news/',
restrict_css=[
'.news-list', # 新闻列表区域
'.pagination' # 分页区域
]
)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
3. 标签和属性控制
# 自定义标签和属性
LinkExtractor(
tags=['a', 'area', 'link'], # 从这些标签提取链接
attrs=['href', 'src'], # 从这些属性提取链接
allow=r'/download/'
)
# 实际应用:提取图片链接
LinkExtractor(
tags=['img'], # 只从img标签
attrs=['src'], # 提取src属性
allow=r'\.(jpg|png|gif)$' # 只要图片文件
)
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
4. 链接处理参数
# 链接规范化和去重
LinkExtractor(
allow=r'/article/',
canonicalize=True, # 规范化URL(默认True)
unique=True, # 去除重复链接(默认True)
strip=True # 去除URL两端空白(默认True)
)
# 链接转换
LinkExtractor(
allow=r'/page/',
process_value=lambda x: x.replace('http://', 'https://') # URL预处理
)
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
# LinkExtractor 实战
# 复杂网站的链接提取配置
class NewsSpider(CrawlSpider):
name = 'news'
allowed_domains = ['news.example.com']
start_urls = ['https://news.example.com/']
rules = (
# 规则1:分页链接(只跟踪,不提取数据)
Rule(
LinkExtractor(
allow=r'/page/\d+/',
restrict_css='.pagination' # 只在分页区域查找
),
follow=True,
process_links='process_page_links' # 自定义链接处理
),
# 规则2:分类页面链接
Rule(
LinkExtractor(
allow=r'/category/[\w-]+/$',
restrict_xpaths='//nav[@class="categories"]'
),
follow=True
),
# 规则3:新闻详情页链接
Rule(
LinkExtractor(
allow=r'/news/\d{4}/\d{2}/\d{2}/[\w-]+\.html',
restrict_css='.news-list',
deny=r'/news/.*/(comments|share)/' # 排除评论和分享链接
),
callback='parse_news',
follow=False
),
)
def process_page_links(self, links):
"""自定义分页链接处理"""
# 过滤掉超过100页的链接
filtered_links = []
for link in links:
page_match = re.search(r'/page/(\d+)/', link.url)
if page_match and int(page_match.group(1)) <= 100:
filtered_links.append(link)
return filtered_links
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# 2.3 Rule
(规则)
Rule
对象定义了爬虫如何处理由 LinkExtractor
提取出来的链接。
# Rule 参数完整说明
from scrapy.spiders import Rule
from scrapy.linkextractors import LinkExtractor
Rule(
link_extractor, # LinkExtractor实例(必需)
callback=None, # 回调函数名(字符串)
cb_kwargs=None, # 传给回调函数的额外参数(字典)
follow=None, # 是否跟踪链接(布尔值)
process_links=None, # 链接处理函数名(字符串)
process_request=None, # 请求处理函数名(字符串)
errback=None # 错误处理函数名(字符串)
)
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
# 参数详细说明
1. callback 回调函数
class MySpider(CrawlSpider):
rules = (
Rule(
LinkExtractor(allow=r'/article/'),
callback='parse_article', # 回调函数名
cb_kwargs={'category': 'news'} # 额外参数
),
)
def parse_article(self, response, category=None):
"""解析文章页面"""
item = {
'title': response.xpath('//h1/text()').get(),
'category': category, # 使用cb_kwargs传入的参数
'url': response.url
}
yield item
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2. follow 跟踪控制
rules = (
# 只跟踪,不处理数据
Rule(LinkExtractor(allow=r'/page/'), follow=True),
# 处理数据,不跟踪
Rule(LinkExtractor(allow=r'/article/'), callback='parse_item', follow=False),
# 既处理数据,又跟踪
Rule(LinkExtractor(allow=r'/category/'), callback='parse_category', follow=True),
)
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
3. 链接和请求处理函数
class AdvancedSpider(CrawlSpider):
rules = (
Rule(
LinkExtractor(allow=r'/product/'),
callback='parse_product',
process_links='filter_links', # 链接过滤函数
process_request='add_headers' # 请求预处理函数
),
)
def filter_links(self, links):
"""过滤链接"""
# 只保留包含特定关键词的链接
filtered = []
for link in links:
if any(keyword in link.url for keyword in ['phone', 'laptop', 'tablet']):
filtered.append(link)
return filtered
def add_headers(self, request, response):
"""为请求添加自定义头"""
request.headers['User-Agent'] = 'Custom Spider 1.0'
request.headers['Referer'] = response.url
return request
def parse_product(self, response):
"""解析产品页面"""
yield {
'name': response.xpath('//h1/text()').get(),
'price': response.xpath('//span[@class="price"]/text()').get(),
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 三、实战案例:多种类型网站爬取
# 3.1 新闻网站爬取
# spiders/news_spider.py
import scrapy
from scrapy.linkextractors import LinkExtractor
from scrapy.spiders import CrawlSpider, Rule
import re
from datetime import datetime, timedelta
class NewsSpider(CrawlSpider):
name = 'news'
allowed_domains = ['news.example.com']
start_urls = ['https://news.example.com/']
# 自定义设置
custom_settings = {
'DOWNLOAD_DELAY': 1,
'CONCURRENT_REQUESTS_PER_DOMAIN': 2,
'DEPTH_LIMIT': 3, # 限制爬取深度
}
rules = (
# 规则1:首页和分类页面的分页
Rule(
LinkExtractor(
allow=r'/(page/\d+/|category/[\w-]+/page/\d+/)$',
restrict_css='.pagination, .category-nav'
),
follow=True,
process_links='filter_recent_pages'
),
# 规则2:分类页面
Rule(
LinkExtractor(
allow=r'/category/[\w-]+/$',
restrict_xpaths='//nav[@class="main-nav"]'
),
follow=True
),
# 规则3:新闻详情页
Rule(
LinkExtractor(
allow=r'/\d{4}/\d{2}/\d{2}/[\w-]+\.html$',
restrict_css='.news-list, .featured-news',
deny=r'/(tag|author|comment)/'
),
callback='parse_news',
process_request='add_timestamp'
),
# 规则4:专题页面
Rule(
LinkExtractor(
allow=r'/special/[\w-]+/$',
restrict_css='.special-topics'
),
callback='parse_special',
follow=True
),
)
def filter_recent_pages(self, links):
"""只爬取最近30天的分页"""
cutoff_date = datetime.now() - timedelta(days=30)
filtered = []
for link in links:
# 从URL中提取日期信息
date_match = re.search(r'/(\d{4})/(\d{2})/(\d{2})/', link.url)
if date_match:
year, month, day = map(int, date_match.groups())
link_date = datetime(year, month, day)
if link_date >= cutoff_date:
filtered.append(link)
else:
# 非日期页面,保留
filtered.append(link)
self.logger.info(f"Filtered {len(links)} links to {len(filtered)}")
return filtered
def add_timestamp(self, request, response):
"""为请求添加时间戳"""
request.meta['crawl_time'] = datetime.now().isoformat()
return request
def parse_news(self, response):
"""解析新闻详情页"""
# 提取新闻基本信息
title = response.xpath('//h1[@class="article-title"]/text()').get()
if not title:
self.logger.warning(f"No title found for {response.url}")
return
# 提取发布时间
pub_time = response.xpath('//time[@class="publish-time"]/@datetime').get()
if not pub_time:
pub_time = response.xpath('//span[@class="date"]/text()').get()
# 提取作者
author = response.xpath('//span[@class="author"]/text()').get()
# 提取正文内容
content_parts = response.xpath('//div[@class="article-content"]//p/text()').getall()
content = '\n'.join(part.strip() for part in content_parts if part.strip())
# 提取标签
tags = response.xpath('//div[@class="tags"]//a/text()').getall()
# 提取分类
category = response.xpath('//nav[@class="breadcrumb"]//a[last()-1]/text()').get()
yield {
'title': title.strip(),
'content': content,
'author': author.strip() if author else None,
'publish_time': pub_time,
'category': category.strip() if category else None,
'tags': [tag.strip() for tag in tags],
'url': response.url,
'crawl_time': response.meta.get('crawl_time'),
'word_count': len(content) if content else 0,
}
def parse_special(self, response):
"""解析专题页面"""
yield {
'type': 'special_topic',
'title': response.xpath('//h1/text()').get(),
'description': response.xpath('//div[@class="description"]/text()').get(),
'url': response.url,
'article_count': len(response.xpath('//div[@class="article-list"]//a').getall()),
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# 3.2 电商网站商品爬取
# spiders/ecommerce_spider.py
import scrapy
from scrapy.linkextractors import LinkExtractor
from scrapy.spiders import CrawlSpider, Rule
import json
import re
class EcommerceSpider(CrawlSpider):
name = 'ecommerce'
allowed_domains = ['shop.example.com']
start_urls = ['https://shop.example.com/']
custom_settings = {
'DOWNLOAD_DELAY': 2,
'RANDOMIZE_DOWNLOAD_DELAY': 0.5,
'USER_AGENT': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'COOKIES_ENABLED': True,
}
rules = (
# 规则1:分类页面
Rule(
LinkExtractor(
allow=r'/category/[\w-]+/$',
restrict_css='.category-menu, .sidebar-categories'
),
follow=True,
callback='parse_category'
),
# 规则2:分类页面的分页
Rule(
LinkExtractor(
allow=r'/category/[\w-]+/page/\d+/$',
restrict_css='.pagination'
),
follow=True
),
# 规则3:商品详情页
Rule(
LinkExtractor(
allow=r'/product/[\w-]+\.html$',
restrict_css='.product-list, .featured-products',
deny=r'/(review|compare|wishlist)/'
),
callback='parse_product',
process_request='add_product_headers'
),
# 规则4:品牌页面
Rule(
LinkExtractor(
allow=r'/brand/[\w-]+/$',
restrict_css='.brand-list'
),
follow=True
),
)
def add_product_headers(self, request, response):
"""为商品页面添加特殊头部"""
request.headers.update({
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
'Accept-Encoding': 'gzip, deflate',
'Referer': response.url
})
return request
def parse_category(self, response):
"""解析分类页面信息"""
category_name = response.xpath('//h1[@class="category-title"]/text()').get()
product_count = response.xpath('//span[@class="product-count"]/text()').re_first(r'(\d+)')
yield {
'type': 'category',
'name': category_name.strip() if category_name else None,
'product_count': int(product_count) if product_count else 0,
'url': response.url,
}
def parse_product(self, response):
"""解析商品详情页"""
# 基本信息
name = response.xpath('//h1[@class="product-title"]/text()').get()
if not name:
self.logger.warning(f"No product name found for {response.url}")
return
# 价格信息
current_price = self.extract_price(
response.xpath('//span[@class="current-price"]/text()').get()
)
original_price = self.extract_price(
response.xpath('//span[@class="original-price"]/text()').get()
)
# 商品详情
description = response.xpath('//div[@class="product-description"]//text()').getall()
description = ' '.join(part.strip() for part in description if part.strip())
# 规格参数
specs = {}
spec_rows = response.xpath('//table[@class="specifications"]//tr')
for row in spec_rows:
key = row.xpath('.//td[1]/text()').get()
value = row.xpath('.//td[2]/text()').get()
if key and value:
specs[key.strip()] = value.strip()
# 图片链接
images = response.xpath('//div[@class="product-images"]//img/@src').getall()
images = [response.urljoin(img) for img in images]
# 评分和评论数
rating = response.xpath('//span[@class="rating-score"]/text()').get()
review_count = response.xpath('//span[@class="review-count"]/text()').re_first(r'(\d+)')
# 库存状态
stock_status = response.xpath('//span[@class="stock-status"]/text()').get()
# 分类信息
breadcrumb = response.xpath('//nav[@class="breadcrumb"]//a/text()').getall()
category_path = ' > '.join(breadcrumb[1:]) if len(breadcrumb) > 1 else None
yield {
'name': name.strip(),
'current_price': current_price,
'original_price': original_price,
'discount': self.calculate_discount(original_price, current_price),
'description': description,
'specifications': specs,
'images': images,
'rating': float(rating) if rating else None,
'review_count': int(review_count) if review_count else 0,
'stock_status': stock_status.strip() if stock_status else None,
'category_path': category_path,
'url': response.url,
}
# 如果有评论,可以进一步爬取评论
review_url = response.xpath('//a[@class="view-reviews"]/@href').get()
if review_url:
yield response.follow(
review_url,
callback=self.parse_reviews,
meta={'product_name': name.strip()}
)
def extract_price(self, price_text):
"""从价格文本中提取数值"""
if not price_text:
return None
# 移除货币符号和空格,提取数字
price_match = re.search(r'[\d,]+\.?\d*', price_text.replace(',', ''))
return float(price_match.group()) if price_match else None
def calculate_discount(self, original, current):
"""计算折扣百分比"""
if not original or not current or original <= current:
return 0
return round((original - current) / original * 100, 2)
def parse_reviews(self, response):
"""解析商品评论"""
product_name = response.meta.get('product_name')
reviews = response.xpath('//div[@class="review-item"]')
for review in reviews:
yield {
'type': 'review',
'product_name': product_name,
'reviewer': review.xpath('.//span[@class="reviewer-name"]/text()').get(),
'rating': len(review.xpath('.//span[@class="star filled"]')),
'content': review.xpath('.//div[@class="review-content"]/text()').get(),
'date': review.xpath('.//span[@class="review-date"]/text()').get(),
'url': response.url,
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# 3.3 博客/论坛爬取
# spiders/blog_spider.py
import scrapy
from scrapy.linkextractors import LinkExtractor
from scrapy.spiders import CrawlSpider, Rule
from datetime import datetime
import hashlib
class BlogSpider(CrawlSpider):
name = 'blog'
allowed_domains = ['blog.example.com']
start_urls = ['https://blog.example.com/']
custom_settings = {
'DEPTH_LIMIT': 5,
'DOWNLOAD_DELAY': 0.5,
'DUPEFILTER_DEBUG': True, # 调试重复过滤
}
rules = (
# 规则1:归档页面(按年月分类)
Rule(
LinkExtractor(
allow=r'/\d{4}/(\d{2}/)?$', # 年份或年月页面
restrict_css='.archive-nav, .sidebar'
),
follow=True
),
# 规则2:标签和分类页面
Rule(
LinkExtractor(
allow=r'/(tag|category)/[\w-]+/$',
restrict_css='.tag-cloud, .categories'
),
follow=True,
callback='parse_taxonomy'
),
# 规则3:分页链接
Rule(
LinkExtractor(
allow=r'/(page/\d+/|[\w-]+/page/\d+/)$',
restrict_css='.pagination, .nav-links'
),
follow=True
),
# 规则4:文章详情页
Rule(
LinkExtractor(
allow=r'/\d{4}/\d{2}/\d{2}/[\w-]+/$', # 日期格式的文章URL
restrict_css='.post-list, .content-area',
deny=r'/(feed|rss|xml|json)$'
),
callback='parse_post',
process_request='add_post_meta'
),
# 规则5:页面(非文章)
Rule(
LinkExtractor(
allow=r'/[\w-]+/$',
deny=r'/(admin|login|register|wp-|feed)',
restrict_css='.main-nav, .footer-nav'
),
callback='parse_page'
),
)
def add_post_meta(self, request, response):
"""为文章请求添加元数据"""
request.meta.update({
'source_page': response.url,
'crawl_timestamp': datetime.now().isoformat(),
'depth': response.meta.get('depth', 0) + 1
})
return request
def parse_taxonomy(self, response):
"""解析标签/分类页面"""
# 确定是标签还是分类
page_type = 'tag' if '/tag/' in response.url else 'category'
name = response.url.split('/')[-2] # 从URL提取名称
# 获取描述
description = response.xpath('//div[@class="taxonomy-description"]/text()').get()
# 统计文章数量
post_count = len(response.xpath('//article[@class="post"]'))
yield {
'type': page_type,
'name': name,
'display_name': response.xpath('//h1/text()').get(),
'description': description.strip() if description else None,
'post_count': post_count,
'url': response.url,
}
def parse_post(self, response):
"""解析博客文章"""
# 文章标题
title = response.xpath('//h1[@class="entry-title"]/text()').get()
if not title:
title = response.xpath('//title/text()').get()
if title:
title = title.split(' | ')[0] # 移除网站名称
if not title:
self.logger.warning(f"No title found for {response.url}")
return
# 发布日期
pub_date = response.xpath('//time[@class="entry-date"]/@datetime').get()
if not pub_date:
pub_date = response.xpath('//meta[@property="article:published_time"]/@content').get()
# 作者信息
author = response.xpath('//span[@class="author"]/text()').get()
if not author:
author = response.xpath('//meta[@name="author"]/@content').get()
# 文章内容
content_selectors = [
'//div[@class="entry-content"]//text()',
'//div[@class="post-content"]//text()',
'//article[@class="post"]//p//text()'
]
content = None
for selector in content_selectors:
content_parts = response.xpath(selector).getall()
if content_parts:
content = '\n'.join(part.strip() for part in content_parts if part.strip())
break
# 标签和分类
tags = response.xpath('//div[@class="post-tags"]//a/text()').getall()
categories = response.xpath('//div[@class="post-categories"]//a/text()').getall()
# 摘要
excerpt = response.xpath('//meta[@name="description"]/@content').get()
if not excerpt and content:
# 如果没有摘要,从内容中提取前200字符
excerpt = content[:200] + '...' if len(content) > 200 else content
# 评论数
comment_count = response.xpath('//span[@class="comments-count"]/text()').re_first(r'(\d+)')
# 阅读时间估算(基于字数)
word_count = len(content.split()) if content else 0
reading_time = max(1, word_count // 200) # 假设每分钟200字
# 生成内容哈希(用于去重)
content_hash = hashlib.md5(
(title + (content or '')).encode('utf-8')
).hexdigest()
yield {
'title': title.strip(),
'content': content,
'excerpt': excerpt.strip() if excerpt else None,
'author': author.strip() if author else None,
'publish_date': pub_date,
'tags': [tag.strip() for tag in tags],
'categories': [cat.strip() for cat in categories],
'comment_count': int(comment_count) if comment_count else 0,
'word_count': word_count,
'reading_time': reading_time,
'content_hash': content_hash,
'url': response.url,
'source_page': response.meta.get('source_page'),
'crawl_depth': response.meta.get('depth', 0),
}
# 如果有评论链接,可以进一步爬取
comments_url = response.xpath('//a[contains(@class, "comments-link")]/@href').get()
if comments_url and comment_count and int(comment_count) > 0:
yield response.follow(
comments_url,
callback=self.parse_comments,
meta={'post_title': title.strip()}
)
def parse_page(self, response):
"""解析静态页面"""
title = response.xpath('//h1/text()').get()
if not title:
title = response.xpath('//title/text()').get()
content = response.xpath('//div[@class="page-content"]//text()').getall()
content = '\n'.join(part.strip() for part in content if part.strip())
yield {
'type': 'page',
'title': title.strip() if title else None,
'content': content,
'url': response.url,
}
def parse_comments(self, response):
"""解析评论"""
post_title = response.meta.get('post_title')
comments = response.xpath('//div[@class="comment"]')
for comment in comments:
yield {
'type': 'comment',
'post_title': post_title,
'author': comment.xpath('.//span[@class="comment-author"]/text()').get(),
'content': comment.xpath('.//div[@class="comment-content"]/text()').get(),
'date': comment.xpath('.//time/@datetime').get(),
'url': response.url,
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# 四、调试与优化技巧
# 4.1 调试 CrawlSpider
# 启用详细日志
# settings.py
LOG_LEVEL = 'DEBUG'
LOG_FILE = 'scrapy_debug.log'
# 启用特定组件的调试信息
LOGSTATS_INTERVAL = 10 # 每10秒输出统计信息
DUPEFILTER_DEBUG = True # 调试重复过滤器
SCHEDULER_DEBUG = True # 调试调度器
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
# 使用 Scrapy Shell 调试规则
# 测试 LinkExtractor
scrapy shell "https://example.com"
# 在shell中测试链接提取
>>> from scrapy.linkextractors import LinkExtractor
>>> le = LinkExtractor(allow=r'/article/\d+\.html')
>>> links = le.extract_links(response)
>>> for link in links:
... print(link.url)
# 测试XPath范围限制
>>> le = LinkExtractor(
... allow=r'/news/',
... restrict_xpaths='//div[@class="news-list"]'
... )
>>> links = le.extract_links(response)
>>> len(links)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 自定义调试中间件
# middlewares.py
class CrawlSpiderDebugMiddleware:
"""CrawlSpider调试中间件"""
def __init__(self):
self.link_stats = {}
self.rule_stats = {}
def process_request(self, request, spider):
# 记录请求来源规则
if hasattr(request, 'meta') and 'rule' in request.meta:
rule_id = request.meta['rule']
self.rule_stats[rule_id] = self.rule_stats.get(rule_id, 0) + 1
return None
def process_response(self, request, response, spider):
# 分析响应中的链接
if hasattr(spider, 'rules'):
for i, rule in enumerate(spider.rules):
links = rule.link_extractor.extract_links(response)
self.link_stats[f'rule_{i}'] = len(links)
spider.logger.debug(
f"Rule {i} extracted {len(links)} links from {response.url}"
)
return response
def spider_closed(self, spider):
spider.logger.info(f"Link extraction stats: {self.link_stats}")
spider.logger.info(f"Rule usage stats: {self.rule_stats}")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 4.2 性能优化
# 限制爬取深度和范围
class OptimizedSpider(CrawlSpider):
custom_settings = {
'DEPTH_LIMIT': 3, # 限制爬取深度
'DEPTH_PRIORITY': 1, # 深度优先级
'SCHEDULER_DISK_QUEUE': 'scrapy.squeues.PickleFifoDiskQueue',
'SCHEDULER_MEMORY_QUEUE': 'scrapy.squeues.FifoMemoryQueue',
# 限制并发
'CONCURRENT_REQUESTS': 16,
'CONCURRENT_REQUESTS_PER_DOMAIN': 8,
# 设置超时
'DOWNLOAD_TIMEOUT': 30,
# 启用压缩
'COMPRESSION_ENABLED': True,
}
def __init__(self, max_pages=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.max_pages = int(max_pages) if max_pages else None
self.page_count = 0
def parse_start_url(self, response, **kwargs):
"""重写以添加页面计数"""
if self.max_pages and self.page_count >= self.max_pages:
self.logger.info(f"Reached max pages limit: {self.max_pages}")
return
self.page_count += 1
return super().parse_start_url(response, **kwargs)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 智能去重策略
# 自定义去重过滤器
class SmartDupeFilter:
def __init__(self):
self.seen_urls = set()
self.seen_content_hashes = set()
def request_seen(self, request):
"""检查请求是否已见过"""
url_key = self.request_fingerprint(request)
if url_key in self.seen_urls:
return True
self.seen_urls.add(url_key)
return False
def request_fingerprint(self, request):
"""生成请求指纹"""
# 移除不重要的URL参数
from urllib.parse import urlparse, parse_qs, urlencode
parsed = urlparse(request.url)
params = parse_qs(parsed.query)
# 移除跟踪参数
tracking_params = ['utm_source', 'utm_medium', 'utm_campaign', 'ref']
for param in tracking_params:
params.pop(param, None)
clean_query = urlencode(params, doseq=True)
clean_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
if clean_query:
clean_url += f"?{clean_query}"
return clean_url
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 4.3 错误处理和重试
class RobustSpider(CrawlSpider):
custom_settings = {
'RETRY_TIMES': 3,
'RETRY_HTTP_CODES': [500, 502, 503, 504, 408, 429],
'DOWNLOAD_DELAY': 1,
'RANDOMIZE_DOWNLOAD_DELAY': 0.5,
# 启用自动限速
'AUTOTHROTTLE_ENABLED': True,
'AUTOTHROTTLE_START_DELAY': 1,
'AUTOTHROTTLE_MAX_DELAY': 60,
'AUTOTHROTTLE_TARGET_CONCURRENCY': 2.0,
}
def parse_item(self, response):
"""带错误处理的解析方法"""
try:
# 检查响应是否有效
if not self.is_valid_response(response):
self.logger.warning(f"Invalid response from {response.url}")
return
# 提取数据
item = self.extract_item_data(response)
if item:
yield item
else:
self.logger.warning(f"No data extracted from {response.url}")
except Exception as e:
self.logger.error(f"Error parsing {response.url}: {str(e)}")
# 可以选择重新调度请求
yield scrapy.Request(
response.url,
callback=self.parse_item,
dont_filter=True,
meta={'retry_count': response.meta.get('retry_count', 0) + 1}
)
def is_valid_response(self, response):
"""检查响应是否有效"""
if response.status != 200:
return False
# 检查是否是错误页面
error_indicators = [
'404 Not Found',
'Page Not Found',
'Access Denied',
'Service Unavailable'
]
for indicator in error_indicators:
if indicator.lower() in response.text.lower():
return False
return True
def extract_item_data(self, response):
"""提取项目数据(子类应重写此方法)"""
return {
'url': response.url,
'title': response.xpath('//title/text()').get(),
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# 五、高级用法与技巧
# 5.1 动态规则生成
class DynamicRulesSpider(CrawlSpider):
name = 'dynamic_rules'
def __init__(self, categories=None, *args, **kwargs):
super().__init__(*args, **kwargs)
# 根据参数动态生成规则
self.rules = self.generate_rules(categories)
super(DynamicRulesSpider, self)._compile_rules()
def generate_rules(self, categories):
"""根据参数动态生成爬取规则"""
rules = []
if categories:
category_list = categories.split(',')
for category in category_list:
# 为每个分类创建规则
rules.append(
Rule(
LinkExtractor(allow=rf'/category/{category}/'),
follow=True
)
)
rules.append(
Rule(
LinkExtractor(allow=rf'/{category}/[\w-]+\.html'),
callback='parse_item'
)
)
else:
# 默认规则
rules = [
Rule(LinkExtractor(allow=r'/.*'), callback='parse_item')
]
return tuple(rules)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 5.2 多站点统一爬取
class MultiSiteSpider(CrawlSpider):
name = 'multi_site'
# 定义不同站点的配置
SITE_CONFIGS = {
'site1.com': {
'rules': [
Rule(LinkExtractor(allow=r'/news/'), callback='parse_news'),
Rule(LinkExtractor(allow=r'/page/\d+/'), follow=True),
],
'selectors': {
'title': '//h1[@class="title"]/text()',
'content': '//div[@class="content"]//text()',
}
},
'site2.com': {
'rules': [
Rule(LinkExtractor(allow=r'/article/'), callback='parse_article'),
Rule(LinkExtractor(allow=r'/\d+/'), follow=True),
],
'selectors': {
'title': '//h2[@class="post-title"]/text()',
'content': '//article//p//text()',
}
}
}
def __init__(self, sites=None, *args, **kwargs):
super().__init__(*args, **kwargs)
if sites:
site_list = sites.split(',')
self.allowed_domains = site_list
self.start_urls = [f'https://{site}' for site in site_list]
# 合并所有站点的规则
all_rules = []
for site, config in self.SITE_CONFIGS.items():
if not sites or site in sites:
all_rules.extend(config['rules'])
self.rules = tuple(all_rules)
super(MultiSiteSpider, self)._compile_rules()
def parse_news(self, response):
"""解析新闻页面"""
return self.parse_with_config(response, 'news')
def parse_article(self, response):
"""解析文章页面"""
return self.parse_with_config(response, 'article')
def parse_with_config(self, response, content_type):
"""根据站点配置解析内容"""
domain = response.url.split('/')[2]
config = self.SITE_CONFIGS.get(domain)
if not config:
self.logger.warning(f"No config found for domain: {domain}")
return
selectors = config['selectors']
yield {
'type': content_type,
'site': domain,
'title': response.xpath(selectors['title']).get(),
'content': ' '.join(response.xpath(selectors['content']).getall()),
'url': response.url,
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# 5.3 增量爬取
import redis
from datetime import datetime, timedelta
class IncrementalSpider(CrawlSpider):
name = 'incremental'
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
self.last_crawl_key = f"{self.name}:last_crawl"
def start_requests(self):
"""生成初始请求,考虑上次爬取时间"""
last_crawl = self.redis_client.get(self.last_crawl_key)
if last_crawl:
last_crawl_time = datetime.fromisoformat(last_crawl.decode())
self.logger.info(f"Last crawl: {last_crawl_time}")
# 只爬取最近更新的内容
cutoff_time = last_crawl_time - timedelta(hours=1) # 1小时重叠
self.cutoff_timestamp = cutoff_time.timestamp()
else:
self.logger.info("First time crawling")
self.cutoff_timestamp = 0
for url in self.start_urls:
yield scrapy.Request(url, callback=self.parse)
def parse_item(self, response):
"""解析时检查更新时间"""
# 提取文章发布/更新时间
pub_time_str = response.xpath('//time/@datetime').get()
if pub_time_str:
try:
pub_time = datetime.fromisoformat(pub_time_str.replace('Z', '+00:00'))
pub_timestamp = pub_time.timestamp()
# 如果文章太旧,跳过
if pub_timestamp < self.cutoff_timestamp:
self.logger.debug(f"Skipping old article: {response.url}")
return
except ValueError:
self.logger.warning(f"Could not parse date: {pub_time_str}")
# 提取数据
yield {
'title': response.xpath('//h1/text()').get(),
'content': ' '.join(response.xpath('//div[@class="content"]//text()').getall()),
'publish_time': pub_time_str,
'url': response.url,
'crawl_time': datetime.now().isoformat(),
}
def closed(self, reason):
"""爬虫结束时记录爬取时间"""
current_time = datetime.now().isoformat()
self.redis_client.set(self.last_crawl_key, current_time)
self.logger.info(f"Crawl completed. Next incremental crawl will start from {current_time}")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
编辑此页 (opens new window)
上次更新: 2025/07/27, 04:30:11