码迷,mamicode.com
首页 > 其他好文 > 详细

分布式爬虫

时间:2018-11-02 23:44:09      阅读:142      评论:0      收藏:0      [点我收藏+]

标签:rac   run   bind   机器   字符串   python模块   scheduler   serialize   爬虫   

一,介绍

1.scrapy框架为何不能实现分布式?

  其一:因为多台机器上部署的scrapy会各自拥有各自的调度器,这样就使得多台机器无法分配start_urls列表中的url。(多台机器无法共享同一个调度器)

  其二:多台机器爬取到的数据无法通过同一个管道对数据进行统一的数据持久出存储。(多台机器无法共享同一个管道)

2.scrapy_redis实现原理

原来scrapy的Scheduler维护的是本机的任务队列(存放Request对象及其回调函数等信息)+本机的去重队列(存放访问过的url地址)

技术分享图片

所以实现分布式爬取的关键就是,找一台专门的主机上运行一个共享的队列比如Redis,
然后重写Scrapy的Scheduler,让新的Scheduler到共享队列存取Request,并且去除重复的Request请求,所以总结下来,实现分布式的关键就是三点:

#1、共享队列
#2、重写Scheduler,让其无论是去重还是任务都去访问共享队列
#3、为Scheduler定制去重规则(利用redis的集合类型)

以上三点便是scrapy-redis组件的核心功能

技术分享图片

安装:
pip3 install scrapy-redis

#源码:
D:\python3.6\Lib\site-packages\scrapy_redis

二,使用

1.安装scrapy-redis组件:

    - pip install scrapy-redis

    - scrapy-redis是基于scrapy框架开发出的一套组件,其作用就是可以让scrapy实现分布式爬虫。

2.编写爬虫文件:

    - 同之前scrapy中基于Spider或者CrawlSpider的编写方式一致。

3.编写管道文件:

    - 在scrapy-redis组件中已经帮助我们封装好了一个专门用于连接存储redis数据库的管道(RedisPipeline),因此我们直接使用即可,无需自己编写管道文件。

4.编写配置文件:

    - 在settings.py中开启管道,且指定使用scrapy-redis中封装好的管道。

ITEM_PIPELINES = {
    scrapy_redis.pipelines.RedisPipeline: 400
}

- 该管道默认会连接且将数据存储到本机的redis服务中,如果想要连接存储到其他redis服务中需要在settings.py中进行如下配置:

REDIS_HOST = redis服务的ip地址
REDIS_PORT = 6379
REDIS_ENCODING = ‘utf-8’
REDIS_PARAMS = {‘password’:’123456’}

注意:redis中的配置需要修改

.对redis配置文件进行配置:

- 注释该行:bind 127.0.0.1,表示可以让其他ip访问redis

- 将yes该为no:protected-mode no,表示可以让其他ip操作redis

 

ex:redis实现分布式基本流程

爬虫文件:

# -*- coding: utf-8 -*-
import scrapy
from scrapy.linkextractors import LinkExtractor
from scrapy.spiders import CrawlSpider, Rule
from scrapy_redis.spiders import RedisCrawlSpider
from redisScrapyPro.items import RedisscrapyproItem


# -1.将redis数据库的配置文件进行改动: protected-mode no        #bind 127.0.0.1
# 0.下载scrapy-redis
# 1.创建工程
# 2.创建基于scrawlSpider的爬虫文件
# 3.导入RedisCrawlSpider类
# 4.将start_urls更换成redis_key属性
# 5.在现有代码的基础上进行连接提取和解析操作
# 6.将解析的数据值封装到item中,然后将item对象提交到scrapy-redis组件中的管道里(‘scrapy_redis.pipelines.RedisPipeline‘: 400,)
# 7.管道会将数据值写入到指定的redis数据库中(在配置文件中进行指定redis数据库ip的编写)
# 8.在当前工程中使用scrapy-redis封装好的调度器(在配置文件中进行配置)
# 9.将起始url扔到调度器队列(redis_key)中
# 10.启动redis服务器:redis-server redis.windows.conf
# 11.启动redis-cli
# 12.执行当前爬虫文件:scrapy runspider 爬虫文件
# 13.向队列中扔一个起始url:在redis-cli执行扔的操作(lpush redis_key的value值 起始url)
class RedisdemoSpider(RedisCrawlSpider):
    name = redisDemo
    # allowed_domains = [‘www.xxx.com‘]
    # start_urls = [‘http://www.xxx.com/‘]

    # scrapy_redis的调度器队列的名称,最终我们会根据该队列的名称向调度器队列中扔一个起始url
    redis_key = "redisQueue"

    link = LinkExtractor(allow=r/pic/page/\d+\?s=\d+)
    link1 = LinkExtractor(allow=r/pic/page/1)
    rules = (
        Rule(link, callback=parse_item, follow=True),
        Rule(link1, callback=parse_item, follow=True),
    )

    def parse_item(self, response):
        div_list = response.xpath(//div[@class="thumb"])
        for div in div_list:
            img_url = "https:" + div.xpath(./a/img/@src).extract_first()
            item = RedisscrapyproItem()
            item[imgUrl] = img_url

            yield item

 使用scrapy-redis组件中封装好的调度器,将所有的url存储到该指定的调度器中,从而实现了多台机器的调度器共享。

# 使用scrapy-redis组件的去重队列
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
# 使用scrapy-redis组件自己的调度器
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
# 是否允许暂停
SCHEDULER_PERSIST = True

 - 使用scrapy-redis组件中封装好的管道,将每台机器爬取到的数据存储通过该管道存储到redis数据库中,从而实现了多台机器的管道共享。

ITEM_PIPELINES = {

   scrapy_redis.pipelines.RedisPipeline: 400,
}

- 执行:scrapy runspider xxx.py,然后向调度器队列中传入起始url:lpush nnspider:start_urls "http://www.xxx.com/"

三,组件相关

以下代码来自:http://www.cnblogs.com/linhaifeng/articles/8359774.html

1、只使用scrapy-redis的去重功能

技术分享图片 

技术分享图片
#一、源码:D:\python3.6\Lib\site-packages\scrapy_redis\dupefilter.py



#二、配置scrapy使用redis提供的共享去重队列

#2.1 在settings.py中配置链接Redis
REDIS_HOST = localhost                            # 主机名
REDIS_PORT = 6379                                   # 端口
REDIS_URL = redis://user:pass@hostname:9001       # 连接URL(优先于以上配置)
REDIS_PARAMS  = {}                                  # Redis连接参数
REDIS_PARAMS[redis_cls] = myproject.RedisClient # 指定连接Redis的Python模块
REDIS_ENCODING = "utf-8"                            # redis编码类型  
# 默认配置:D:\python3.6\Lib\site-packages\scrapy_redis\defaults.py


#2.2 让scrapy使用共享的去重队列
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
#使用scrapy-redis提供的去重功能,查看源码会发现是基于Redis的集合实现的


#2.3、需要指定Redis中集合的key名,key=存放不重复Request字符串的集合
DUPEFILTER_KEY = dupefilter:%(timestamp)s
#源码:dupefilter.py内一行代码key = defaults.DUPEFILTER_KEY % {‘timestamp‘: int(time.time())}


#2.4、去重规则源码分析dupefilter.py
def request_seen(self, request):
    """Returns True if request was already seen.

    Parameters
    ----------
    request : scrapy.http.Request

    Returns
    -------
    bool

    """
    fp = self.request_fingerprint(request) 
    # This returns the number of values added, zero if already exists.
    added = self.server.sadd(self.key, fp)
    return added == 0


#2.5、将request请求转成一串字符后再存入集合

from scrapy.http import Request
from scrapy.utils.request import request_fingerprint

req = Request(url=http://www.baidu.com)
result=request_fingerprint(req)
print(result) #75d6587d87b3f4f3aa574b33dbd69ceeb9eafe7b


#2.6、注意:
    - URL参数位置不同时,计算结果一致;
    - 默认请求头不在计算范围,include_headers可以设置指定请求头
    - 示范:
    from scrapy.utils import request
    from scrapy.http import Request
     
    req = Request(url=http://www.baidu.com?name=8&id=1,callback=lambda x:print(x),cookies={k1:vvvvv})
    result1 = request.request_fingerprint(req,include_headers=[cookies,])
     
    print(result)
     
    req = Request(url=http://www.baidu.com?id=1&name=8,callback=lambda x:print(x),cookies={k1:666})
     
    result2 = request.request_fingerprint(req,include_headers=[cookies,])
     
    print(result1 == result2) #True
使用共享去重队列+源码分析

2、使用scrapy-redis的去重+调度实现分布式爬取

技术分享图片
#1、源码:D:\python3.6\Lib\site-packages\scrapy_redis\scheduler.py


#2、settings.py配置

# Enables scheduling storing requests queue in redis.
SCHEDULER = "scrapy_redis.scheduler.Scheduler"       

# 调度器将不重复的任务用pickle序列化后放入共享任务队列,默认使用优先级队列(默认),其他:PriorityQueue(有序集合),FifoQueue(列表)、LifoQueue(列表)               
SCHEDULER_QUEUE_CLASS = scrapy_redis.queue.PriorityQueue          

# 对保存到redis中的request对象进行序列化,默认使用pickle
SCHEDULER_SERIALIZER = "scrapy_redis.picklecompat"   

# 调度器中请求任务序列化后存放在redis中的key               
SCHEDULER_QUEUE_KEY = %(spider)s:requests    

# 是否在关闭时候保留原来的调度器和去重记录,True=保留,False=清空                     
SCHEDULER_PERSIST = True       

# 是否在开始之前清空 调度器和去重记录,True=清空,False=不清空                                     
SCHEDULER_FLUSH_ON_START = False    

# 去调度器中获取数据时,如果为空,最多等待时间(最后没数据,未获取到)。如果没有则立刻返回会造成空循环次数过多,cpu占用率飙升                                
SCHEDULER_IDLE_BEFORE_CLOSE = 10           

# 去重规则,在redis中保存时对应的key                         
SCHEDULER_DUPEFILTER_KEY = %(spider)s:dupefilter      

# 去重规则对应处理的类,将任务request_fingerprint(request)得到的字符串放入去重队列            
SCHEDULER_DUPEFILTER_CLASS = scrapy_redis.dupefilter.RFPDupeFilter
View Code

3、持久化

#从目标站点获取并解析出数据后保存成item对象,会由引擎交给pipeline进行持久化/保存到数据库,scrapy-redis提供了一个pipeline组件,可以帮我们把item存到redis中
     
#1、将item持久化到redis时,指定key和序列化函数 
REDIS_ITEMS_KEY = %(spider)s:items
REDIS_ITEMS_SERIALIZER = json.dumps
 
#2、使用列表保存item数据

4、从Redis中获取起始URL

scrapy程序爬取目标站点,一旦爬取完毕后就结束了,如果目标站点更新内容了,我们想重新爬取,那么只能再重新启动scrapy,非常麻烦
scrapy-redis提供了一种供,让scrapy从redis中获取起始url,如果没有scrapy则过一段时间再来取而不会关闭
这样我们就只需要写一个简单的脚本程序,定期往redis队列里放入一个起始url。

#具体配置如下

#1、编写爬虫时,起始URL从redis的Key中获取
REDIS_START_URLS_KEY = %(name)s:start_urls
    
#2、获取起始URL时,去集合中获取还是去列表中获取?True,集合;False,列表
REDIS_START_URLS_AS_SET = False    # 获取起始URL时,如果为True,则使用self.server.spop;如果为False,则使用self.server.lpop

 

分布式爬虫

标签:rac   run   bind   机器   字符串   python模块   scheduler   serialize   爬虫   

原文地址:https://www.cnblogs.com/glh-ty/p/9898588.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!