码迷,mamicode.com
首页 > 其他好文 > 详细

基于协程+socket的高效并发爬虫工具

时间:2021-02-27 13:13:04      阅读:0      评论:0      收藏:0      [点我收藏+]

标签:ext   线程   控制   bsp   main   error   pid   背景   net   

一、背景

因为经常有爬虫数据处理的需求,之前一直用的多线程+requests方式爬虫,但是这种方式有两个问题:

1、当请求很多,并发稍微多一点的时候电脑特别卡

2、每次变更请求包,比如post换get,需要用不同的方式解析出header等数据不是很方便

主要基于这两个原因,搞了一个基于socket+协程的方式高效实现这种并发爬虫需求

二、代码如下

import asyncio
import binascii
import gzip
import socket
import ssl


class Spider(object):
    def __init__(
        self,
        parameter_iter,
        concurrent_num=1,
        request_data="",
        protocol="https",
        timeout=3,
        host="",
        port=0,
        data_type="hex",
    ):
        self.parameter_iter = parameter_iter
        self.concurrent_num = concurrent_num
        self.request_data = request_data
        self.encrypt = 1 if protocol == "https" else 0
        self.timeout = timeout
        self.host = host
        self.port = port
        self.data_type = data_type
        self.result = []
        # 看原始数据是hex格式还是字符串格式,对其解码。hex基本用不到
        if self.data_type == "hex":
            self.request_data = self.request_data.strip()
            self.request_data = binascii.unhexlify(self.request_data).strip() + b"\r\n\r\n"
        else:
            self.request_data = self.request_data.strip() + "\r\n\r\n"
            self.request_data = self.request_data.encode(
                encoding="UTF-8", errors="strict"
            )
        # 如果没有设置host/port,从数据包里取
        if not self.host or not self.port:
            try:
                for header in self.request_data.split(b"\n"):
                    if header.startswith(b"Host:"):
                        host = header.split(b" ")[1].strip()
                        if host.find(b":") > -1:
                            self.host, self.port = host.split(b":")
                        elif self.encrypt:
                            self.host = host
                            self.port = 443
                        else:
                            self.host = host
                            self.port = 80
            except Exception as e:
                print("parse error %s" % e)
                pass

    async def send_request(self, parameter=""):

        if self.encrypt:  # 如果是https等加密协议,用ssl
            sock = ssl.wrap_socket(socket.socket())
        else:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(self.timeout)
        try:
            sock.connect((self.host, int(self.port)))
        except Exception as e:
            print("error123 %s" % e)
        sock.send(self.request_data.replace(b"*##*", str(parameter).encode("utf-8")))
        response = b""
        response_code = -1
        compression_flag = 0
        chunk_flag = 1
        content_length = -1
        parse_header_flag = 1
        while True:
            data = sock.recv(1024)
            if response.find(b"\r\n\r\n") > -1 and parse_header_flag:
                parse_header_flag = 0
                response_code = response.split(b"\r\n")[0].split(b" ")[1]
                header_list, body = response.split(b"\r\n\r\n")
                for header in header_list.split(b"\r\n"):
                    if header.startswith(b"Content-Length:"):  # 判断body有多长,用来结束sock接收数据
                        chunk_flag = 0
                        content_length = int(header.split(b" ")[1])
                    elif header == b"Content-Encoding: gzip":  # 判断body是否gzip压缩了
                        compression_flag = 1
                response = body
            # print(content_length, len(response), response_code, response)
            # 两种情况:1、如果分块传输没有Content-Length,就看是不是0结尾2、如果有Content-Length,接收body到Content-Length长度结束
            if data:
                if chunk_flag == 1 and response.strip().endswith(b"\r\n0"):
                    break
                else:
                    response += data
                    if content_length == len(response):
                        break
            else:
                break
        sock.close()
        # 解析分块传输
        parse_data = b""
        # response = response.strip()
        if chunk_flag:
            try:
                # 先格式化chunk数据包,获取原始数据
                count = 20
                while count:
                    count -= 1
                    if response:
                        data_len, left_data = response.split(b"\r\n", 1)
                        # print(data_len, left_data)
                        data_len = int(data_len, 16)
                        parse_data += left_data[:data_len]
                        if data_len == b"0":
                            break
                        else:
                            response = left_data[data_len + 2 :]
            except Exception as e:
                print(e)
        # 解压缩
        if compression_flag:  # 如果压缩了需要解压数据包
            response = gzip.decompress(parse_data)
        self.result.append((response_code, response))

    async def spider_request(self):
        while True:
            try:
                data = next(self.parameter_iter)
                await self.send_request(parameter=data)
            except StopIteration:
                return

    async def main(self):
        task_list = []
        for _ in range(self.concurrent_num):
            task_list.append(asyncio.create_task(self.spider_request()))
        for task in task_list:
            await task

    def run(self):
        asyncio.run(self.main())
        return self.result


if __name__ == "__main__":
    # 下面两个_request_data等效
    _request_data = """GET / HTTP/1.1
Host: www.baidu.com
User-Agent: curl/7.64.1
Accept: */*"""
    # _request_data = "474554202f20485454502f312e310a486f73743a207777772e62616964752e636f6d0a557365722d4167656e743a206375726c2f372e36342e310a4163636570743a202a2f2a0d0a"
    _concurrent_num = 2  # 并发请求数,控制速度
    _parameter_iter = iter(range(10))  # 爬虫过程中需要动态替换的参数,被替换的参数为*##*
    _protocol = "http"  # 请求用的协议
    _timeout = 3  # 单个请求超时时间
    _data_type = "str"  # 请求包编码,支持hex或str格式
    result = Spider(
        parameter_iter=_parameter_iter,
        concurrent_num=_concurrent_num,
        request_data=_request_data,
        protocol=_protocol,
        timeout=_timeout,
        data_type=_data_type,
    ).run()
    for info in result:
        print(info)

 

三、优势:

1、如果应用场景是http/https的爬虫,不用做7层包的解析,只需要替换_request_data,通用性强

2、基于协程性能好一点

3、因为基于socket,理论上扩展比较方便,可以改成支持各种协议

4、支持字符串或hex格式的请求包

四、涉及的知识点

1、怎么判断sock.recv已经接收完了数据(这里只涉及http/https的分块传输或其他传输)

2、分块传输解码  

 

基于协程+socket的高效并发爬虫工具

标签:ext   线程   控制   bsp   main   error   pid   背景   net   

原文地址:https://www.cnblogs.com/xugongzi007/p/14453711.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!