14-Scrapy进阶:分布式爬虫与高性能优化

掌握Scrapy高级特性,实现高并发、高可用的分布式爬虫!

1. 下载中间件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# middlewares.py
class ProxyMiddleware:
"""代理中间件"""

PROXIES = [
'http://ip1:port',
'http://ip2:port',
]

def process_request(self, request, spider):
proxy = random.choice(self.PROXIES)
request.meta['proxy'] = proxy
spider.logger.info(f'使用代理: {proxy}')

class RetryMiddleware:
"""失败重试中间件"""

def process_response(self, request, response, spider):
if response.status in [500, 502, 503, 504, 408]:
spider.logger.warning(f'请求失败 {response.status},重试...')
return request
return response

def process_exception(self, request, exception, spider):
spider.logger.error(f'异常: {exception}')
return request # 重试

2. 启用中间件

1
2
3
4
5
# settings.py
DOWNLOADER_MIDDLEWARES = {
'tutorial.middlewares.ProxyMiddleware': 543,
'tutorial.middlewares.RetryMiddleware': 544,
}

二、请求去重

1. RFPDupeFilter

1
2
3
4
5
6
7
8
9
10
# 自定义去重规则
class URLNormalizeFilter:
def request_seen(self, request):
# 标准化URL后判断
normalized = self.normalize_url(request.url)
return normalized in self.seen_urls

def normalize_url(self, url):
# 移除追踪参数
return url.split('?')[0]

2. Redis去重(分布式)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 使用RedisBloomFilter
import redis
from scrapy.dupefilters import BaseDupeFilter
from scrapy import Request

class RedisBloomDupeFilter(BaseDupeFilter):
"""Redis布隆过滤器去重"""

def __init__(self, server, key):
self.server = server
self.key = key

def request_seen(self, request):
if self.server.bf.exists(self.key, request.url):
return True
self.server.bf.add(self.key, request.url)
return False

def close(self, reason):
pass

三、分布式爬虫(Scrapy-Redis)

1. 安装

1
pip install scrapy-redis

2. 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# settings.py
# 启用Redis调度器
SCHEDULER = 'scrapy_redis.scheduler.Scheduler'

# 持久化请求
SCHEDULER_PERSIST = True

# 使用Redis去重
DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'

# Redis连接
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_PASSWORD = None

3. 分布式Spider

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# distributed_spider.py
from scrapy_redis.spiders import RedisSpider

class DistributedSpider(RedisSpider):
name = 'distributed'
redis_key = 'distributed:start_urls'

def parse(self, response):
# 爬取逻辑
yield {'url': response.url}

# 新的URL放入Redis
for url in response.css('a::attr(href)').getall():
yield self.make_requests_from_url(url)

4. 启动多个爬虫实例

1
2
3
4
5
6
7
8
# 机器1
scrapy runspider distributed_spider.py

# 机器2
scrapy runspider distributed_spider.py

# 推送URL到Redis
redis-cli lpush distributed:start_urls http://example.com/page1

四、性能优化

1. 并发控制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# settings.py
# 同时请求数
CONCURRENT_REQUESTS = 32

# 每个域名最大并发
CONCURRENT_REQUESTS_PER_DOMAIN = 16

# 每个IP最大并发
CONCURRENT_REQUESTS_PER_IP = 8

# 自动限速
AUTOTHROTTLE_ENABLED = True
AUTOTHROTTLE_START_DELAY = 1
AUTOTHROTTLE_MAX_DELAY = 10
AUTOTHROTTLE_TARGET_CONCURRENCY = 2.0

2. 下载超时

1
2
3
# settings.py
DOWNLOAD_TIMEOUT = 15
AUTOPROXY_HTTP_TIMEOUT = 30

3. 连接池复用

1
2
3
4
5
6
7
# 使用aiohttp代替requests(异步)
import aiohttp

class AiohttpDownloader:
async def fetch(self, url, session):
async with session.get(url) as response:
return await response.text()

五、数据处理Pipeline

1. 去重Pipeline

1
2
3
4
5
6
7
8
9
10
# pipelines.py
class DuplicatesPipeline:
def __init__(self):
self.urls_seen = set()

def process_item(self, item, spider):
if item['url'] in self.urls_seen:
raise DropItem(f'Duplicate item: {item}')
self.urls_seen.add(item['url'])
return item

2. 清洗Pipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class CleanPipeline:
def process_item(self, item, spider):
item['title'] = item['title'].strip()
item['content'] = item['content'].replace('\n', '')
item['publish_date'] = self.parse_date(item['publish_date'])
return item

def parse_date(self, date_str):
# 日期解析
for fmt in ['%Y-%m-%d', '%Y/%m/%d', '%m/%d/%Y']:
try:
return datetime.strptime(date_str, fmt).strftime('%Y-%m-%d')
except:
continue
return date_str

3. MongoDB存储

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import pymongo

class MongoPipeline:
def __init__(self, mongo_uri, mongo_db):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db

@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri=crawler.settings.get('MONGO_URI'),
mongo_db=crawler.settings.get('MONGO_DB')
)

def open_spider(self, spider):
self.client = pymongo.MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]

def close_spider(self, spider):
self.client.close()

def process_item(self, item, spider):
self.db[item.collection_name].insert_one(dict(item))
return item

4. MySQL存储

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import pymysql

class MysqlPipeline:
def __init__(self, host, port, user, password, db):
self.conn = pymysql.connect(
host=host, port=port, user=user,
password=password, database=db, charset='utf8mb4'
)

def process_item(self, item, spider):
with self.conn.cursor() as cursor:
sql = '''
INSERT INTO articles (title, url, content, created_at)
VALUES (%s, %s, %s, %s)
'''
cursor.execute(sql, (
item['title'], item['url'],
item['content'], item['created_at']
))
self.conn.commit()
return item

六、爬虫部署

1. 使用Scrapyd

1
2
3
4
5
# 安装
pip install scrapyd

# 启动
scrapyd

2. 部署爬虫

1
2
3
4
5
6
7
8
# 打包
scrapyd-deploy myproject -p MyProject

# API部署
curl http://localhost:6800/addversion.json \
-F project=myproject \
-F version=1 \
-F egg=@myproject.egg

3. 管理爬虫

1
2
3
4
5
6
7
8
# 列出爬虫
curl http://localhost:6800/listprojects.json

# 启动爬虫
curl http://localhost:6800/schedule.json -d project=myproject -d spider=myspider

# 停止爬虫
curl http://localhost:6800/cancel.json -d project=myproject -d job=xxx

4. 使用Gerapy

1
2
3
4
5
# Gerapy可视化部署
pip install gerapy

gerapy init
gerapy run

七、实战:新闻网站分布式爬虫

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# spiders/news_spider.py
import scrapy
from scrapy_redis.spiders import RedisSpider

class NewsSpider(RedisSpider):
name = 'news'
redis_key = 'news:start_urls'

custom_settings = {
'CONCURRENT_REQUESTS': 32,
'DOWNLOAD_DELAY': 0.5,
'AUTOTHROTTLE_ENABLED': True,
}

def parse(self, response):
# 解析列表页
for article in response.css('div.article-item'):
yield {
'title': article.css('h2 a::text').get(),
'url': article.css('h2 a::attr(href)').get(),
'date': article.css('span.date::text').get(),
}

# 跟进详情页
for detail_url in response.css('div.article-item h2 a::attr(href)').getall():
yield response.follow(detail_url, self.parse_detail)

# 翻页
next_page = response.css('a.next::attr(href)').get()
if next_page:
yield response.follow(next_url, self.parse)

def parse_detail(self, response):
yield {
'content': response.css('div.content::text').getall(),
'author': response.css('span.author::text').get(),
'source': response.css('span.source::text').get(),
}

配置Redis调度器

1
2
3
4
5
# settings.py
REDIS_URL = 'redis://localhost:6379/0'
SCHEDULER = 'scrapy_redis.scheduler.Scheduler'
DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'
SCHEDULER_PERSIST = True

八、常见问题

问题 解决方案
内存泄漏 设置CLOSESPIDER_ITEMSCOUNT
CPU过高 降低CONCURRENT_REQUESTS
代理失效 使用代理池+自动切换
被封 加User-Agent、降低速度
增量爬取 使用Redis记录已爬取URL

九、完整配置示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# settings.py
BOT_NAME = 'news_spider'

CONCURRENT_REQUESTS = 32
DOWNLOAD_DELAY = 0.5
AUTOTHROTTLE_ENABLED = True
AUTOTHROTTLE_START_DELAY = 1
AUTOTHROTTLE_MAX_DELAY = 10

# Redis
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
SCHEDULER = 'scrapy_redis.scheduler.Scheduler'
DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'
SCHEDULER_PERSIST = True

# 中间件
DOWNLOADER_MIDDLEWARES = {
'scrapy_redis.middleware.RedisProxyMiddleware': 543,
'scrapy_redis.middleware.RedisUserAgentMiddleware': 544,
}

# Pipeline
ITEM_PIPELINES = {
'scrapy_redis.pipelines.RedisPipeline': 300,
}

总结

1
2
3
4
5
6
✅ 下载中间件
✅ 请求去重
✅ 分布式爬虫(Scrapy-Redis)
✅ 性能优化
✅ Pipeline存储(MongoDB/MySQL)
✅ 爬虫部署(Scrapyd/Gerapy)

爬虫系列完结:从requests入门到Scrapy分布式!

#Python爬虫 #Scrapy #分布式 #Redis


14-Scrapy进阶:分布式爬虫与高性能优化
https://yourname.github.io/2026/02/11/14-Scrapy进阶与分布式爬虫/
作者
JA
发布于
2026年2月11日
许可协议