博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
day_7:代理使用
阅读量:4549 次
发布时间:2019-06-08

本文共 16037 字,大约阅读时间需要 53 分钟。

一、代理池

代理池分为4块:存储模块、获取模块、检测模块、接口模块,其中多进程运行获取、检测、接口

存储模块:负责存储抓取下来的代理。保证代理不重复,标识代理的可用情况,动态实时处理每个代理(使用Redis的Sorted-Set有序集合)

获取模块:需要定时在各大代理网站抓取代理。

检测模块:需要定时检测数据库中的代理。设置一个检测链接,标识每个代理的状态(100分可用,分越低越不可用)

接口模块:需要用API来提供对外服务的接口。Web API接口,随机取均衡负载。

程序流程:  ---获取--(测试)-->存储<---->定时检测

            | 

          外部接口         

项目目录

|-proxypool

|--crawler.py  获取模块

|--db.py         存储模块

|--tester.py    检测模块

|--conf.py      配置文件

|--api.py        接口模块

|--run.py        程序入口

2、代理池的实现

获取模块crawler.py

import reimport requestsfrom db import RedisClientfrom pyquery import PyQuery as pqfrom conf import *from requests.exceptions import ConnectionErrorclass ProxyMetaclass(type):    """    元类    """    def __new__(cls, name, bases, attrs):        """        :param name:        :param bases:        :param attrs: 包含类的所有方法信息,键名对应方法名称        :return:        """        count = 0        attrs['__CrawlFunc__'] = []        # 遍历所有方法,筛选出以crawl_开头的方法        for k, v in attrs.items():            if 'crawl_' in k:                attrs['__CrawlFunc__'].append(k)                count += 1        attrs['__CrawlFuncCount__'] = count        return type.__new__(cls, name, bases, attrs)class Crawler(object, metaclass=ProxyMetaclass):    base_headers = {        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.71 Safari/537.36',        'Accept-Encoding': 'gzip, deflate, sdch',        'Accept-Language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7'    }    def get_page(self, url, options={}):        """        抓取代理        :param url: 请求url        :param options: 请求头参数        :return: 请求结果        """        headers = dict(Crawler.base_headers, **options)        print('正在抓取', url)        try:            response = requests.get(url, headers=headers)            if response.status_code == 200:                print('抓取成功', url, response.status_code)                return response.text        except ConnectionError:            print('抓取失败', url)            return None    def get_proxies(self, callback):        """        调用Crawler的以crawler开头的方法        :param callback:        :return: 代理列表        """        proxies = []        for proxy in eval("self.{}()".format(callback)):            print('成功获取到代理', proxy)            proxies.append(proxy)        return proxies    def crawl_daili66(self, page_count=4):        """        获取代理66        :param page_count: 页码        :return: 代理        """        start_url = 'http://www.66ip.cn/{}.html'        urls = [start_url.format(page) for page in range(1, page_count + 1)]        for url in urls:            print('Crawling', url)            html = self.get_page(url)            if html:                doc = pq(html)                trs = doc('.containerbox table tr:gt(0)').items()                for tr in trs:                    ip = tr.find('td:nth-child(1)').text()                    port = tr.find('td:nth-child(2)').text()                    yield ':'.join([ip, port])    def crawl_ip3366(self):        for page in range(1, 4):            start_url = 'http://www.ip3366.net/free/?stype=1&page={}'.format(page)            html = self.get_page(start_url)            ip_address = re.compile('\s*(.*?)\s*(.*?)')            # \s * 匹配空格,起到换行作用            re_ip_address = ip_address.findall(html)            for address, port in re_ip_address:                result = address + ':' + port                yield result.replace(' ', '')    def crawl_kuaidaili(self):        for i in range(1, 4):            start_url = 'http://www.kuaidaili.com/free/inha/{}/'.format(i)            html = self.get_page(start_url)            if html:                ip_address = re.compile('(.*?)')                re_ip_address = ip_address.findall(html)                port = re.compile('(.*?)')                re_port = port.findall(html)                for address, port in zip(re_ip_address, re_port):                    address_port = address + ':' + port                    yield address_port.replace(' ', '')    def crawl_xicidaili(self):        for i in range(1, 3):            start_url = 'http://www.xicidaili.com/nn/{}'.format(i)            headers = {                'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',                'Cookie': '_free_proxy_session=BAh7B0kiD3Nlc3Npb25faWQGOgZFVEkiJWRjYzc5MmM1MTBiMDMzYTUzNTZjNzA4NjBhNWRjZjliBjsAVEkiEF9jc3JmX3Rva2VuBjsARkkiMUp6S2tXT3g5a0FCT01ndzlmWWZqRVJNek1WanRuUDBCbTJUN21GMTBKd3M9BjsARg%3D%3D--2a69429cb2115c6a0cc9a86e0ebe2800c0d471b3',                'Host': 'www.xicidaili.com',                'Referer': 'http://www.xicidaili.com/nn/3',                'Upgrade-Insecure-Requests': '1',            }            html = self.get_page(start_url, options=headers)            if html:                find_trs = re.compile('(.*?)', re.S)                trs = find_trs.findall(html)                for tr in trs:                    find_ip = re.compile('(\d+\.\d+\.\d+\.\d+)')                    re_ip_address = find_ip.findall(tr)                    find_port = re.compile('(\d+)')                    re_port = find_port.findall(tr)                    for address, port in zip(re_ip_address, re_port):                        address_port = address + ':' + port                        yield address_port.replace(' ', '')    def crawl_ip3366(self):        for i in range(1, 4):            start_url = 'http://www.ip3366.net/?stype=1&page={}'.format(i)            html = self.get_page(start_url)            if html:                find_tr = re.compile('(.*?)', re.S)                trs = find_tr.findall(html)                for s in range(1, len(trs)):                    find_ip = re.compile('(\d+\.\d+\.\d+\.\d+)')                    re_ip_address = find_ip.findall(trs[s])                    find_port = re.compile('(\d+)')                    re_port = find_port.findall(trs[s])                    for address, port in zip(re_ip_address, re_port):                        address_port = address + ':' + port                        yield address_port.replace(' ', '')    def crawl_iphai(self):        start_url = 'http://www.iphai.com/'        html = self.get_page(start_url)        if html:            find_tr = re.compile('(.*?)', re.S)            trs = find_tr.findall(html)            for s in range(1, len(trs)):                find_ip = re.compile('\s+(\d+\.\d+\.\d+\.\d+)\s+', re.S)                re_ip_address = find_ip.findall(trs[s])                find_port = re.compile('\s+(\d+)\s+', re.S)                re_port = find_port.findall(trs[s])                for address, port in zip(re_ip_address, re_port):                    address_port = address + ':' + port                    yield address_port.replace(' ', '')    def crawl_data5u(self):        start_url = 'http://www.data5u.com/free/gngn/index.shtml'        headers = {            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',            'Accept-Encoding': 'gzip, deflate',            'Accept-Language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7',            'Cache-Control': 'max-age=0',            'Connection': 'keep-alive',            'Cookie': 'JSESSIONID=47AA0C887112A2D83EE040405F837A86',            'Host': 'www.data5u.com',            'Referer': 'http://www.data5u.com/free/index.shtml',            'Upgrade-Insecure-Requests': '1',            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.108 Safari/537.36',        }        html = self.get_page(start_url, options=headers)        if html:            ip_address = re.compile('
  • (\d+\.\d+\.\d+\.\d+)
  • .*?
  • (\d+)
  • ', re.S) re_ip_address = ip_address.findall(html) for address, port in re_ip_address: result = address + ':' + port yield result.replace(' ', '')class GetProxy(): def __init__(self): self.redis = RedisClient() self.crawler = Crawler() def is_over_threshold(self): """ 判断是否达到了代理池限制 """ return True if self.redis.count() >= POOL_UPPER_THRESHOLD else False def run(self): print('获取器开始执行') if not self.is_over_threshold(): for callback_label in range(self.crawler.__CrawlFuncCount__): callback = self.crawler.__CrawlFunc__[callback_label] # 获取代理 proxies = self.crawler.get_proxies(callback) for proxy in proxies: self.redis.add(proxy)

    三个类:

    • ProxyMetaclass(元类):筛选出Crawler类中以crawler_开头的方法
    • Crawler:爬虫类
    1. get_page:获取页面
    2. get_proxies:调用以crawler开头的方法,返回代理list
    3. 其他都是对应网站的解析方法
    • GetProxy
    1. 初始化redis和crawler
    2. is_over_threashold:判断代理池是否达到阀值
    3. run:运行crawler里的所有以crawler开头的方法

    存储模块db.py

    • Redis ZSet存储:字段是IP:PORT+分数(整数)
    • 分数:最高100,(新获取代理)初始10,最低0,每次测试不可用减1,0分删除
    import redisfrom conf import *from random import choiceimport reclass PoolEmptyError(Exception):    def __init__(self):        Exception.__init__(self)    def __str__(self):        return repr('代理池已经枯竭')class RedisClient(object):    def __init__(self, host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD):        """        初始化        :param host: Redis 地址        :param port: Redis 端口        :param password: Redis密码        """        self.db = redis.Redis(host=host, port=port, password=password, decode_responses=True)    def add(self, proxy, score=INITIAL_SCORE):        """        添加代理,设置分数为最高        :param proxy: 代理        :param score: 分数        :return: 添加结果        """        if not re.match('\d+\.\d+\.\d+\.\d+\:\d+', proxy):            print('代理不符合规范', proxy, '丢弃')            return        if not self.db.zscore(REDIS_KEY, proxy):            return self.db.zadd(REDIS_KEY, {proxy:score})    def random(self):        """        随机获取有效代理,首先尝试获取最高分数代理,如果不存在,按照排名获取,否则异常        :return: 随机代理        """        result = self.db.zrangebyscore(REDIS_KEY, MAX_SCORE, MAX_SCORE)        if len(result):            return choice(result)        else:            result = self.db.zrevrange(REDIS_KEY, 0, 100)            if len(result):                return choice(result)            else:                raise PoolEmptyError    def decrease(self, proxy):        """        代理值减一分,小于最小值则删除        :param proxy: 代理        :return: 修改后的代理分数        """        score = self.db.zscore(REDIS_KEY, proxy)        if score and score > MIN_SCORE:            print('代理', proxy, '当前分数', score, '减1')            return self.db.zincrby(REDIS_KEY, -1, proxy)        else:            print('代理', proxy, '当前分数', score, '移除')            return self.db.zrem(REDIS_KEY, proxy)    def exists(self, proxy):        """        判断是否存在        :param proxy: 代理        :return: 是否存在        """        return not self.db.zscore(REDIS_KEY, proxy) == None    def max(self, proxy):        """        将代理设置为MAX_SCORE        :param proxy: 代理        :return: 设置结果        """        print('代理', proxy, '可用,设置为', MAX_SCORE)        return self.db.zadd(REDIS_KEY, MAX_SCORE, proxy)    def count(self):        """        获取数量        :return: 数量        """        return self.db.zcard(REDIS_KEY)    def all(self):        """        获取全部代理        :return: 全部代理列表        """        return self.db.zrangebyscore(REDIS_KEY, MIN_SCORE, MAX_SCORE)    def batch(self, start, stop):        """        批量获取        :param start: 开始索引        :param stop: 结束索引        :return: 代理列表        """        return self.db.zrevrange(REDIS_KEY, start, stop - 1)
    • PoolEmptyError:自定义代理池报错类
    • RedisClient:redis服务器
    1. __init__初始化redis链接池
    2. add添加到redis
    3. random随机获取有效代理
    4. decrease处理测试过的代理
    5. exists:代理是否存在
    6. max:代理设置为最高分
    7. count:获取代理数量
    8. all:获取所有代理
    9. batch:批量获取代理

    检测模块Tester.py

    import asyncioimport aiohttpimport timetry:    from aiohttp import ClientErrorexcept:    from aiohttp import ClientProxyConnectionError as ProxyConnectionErrorfrom db import RedisClientfrom conf import *class Tester(object):    def __init__(self):        self.redis = RedisClient()    async def test_single_proxy(self, proxy):        """        测试单个代理        :param proxy:        :return:        """        conn = aiohttp.TCPConnector(verify_ssl=False)        async with aiohttp.ClientSession(connector=conn) as session:            try:                if isinstance(proxy, bytes):                    proxy = proxy.decode('utf-8')                real_proxy = 'http://' + proxy                print('正在测试', proxy)                async with session.get(TEST_URL, proxy=real_proxy, timeout=15, allow_redirects=False) as response:                    if response.status in VALID_STATUS_CODES:                        self.redis.max(proxy)                        print('代理可用', proxy)                    else:                        self.redis.decrease(proxy)                        print('请求响应码不合法 ', response.status, 'IP', proxy)            except (ClientError, aiohttp.client_exceptions.ClientConnectorError, asyncio.TimeoutError, AttributeError, aiohttp.client_exceptions.ServerDisconnectedError):                self.redis.decrease(proxy)                print('代理请求失败', proxy)    def run(self):        """        测试主函数        :return:        """        print('测试器开始运行')        try:            count = self.redis.count()            print('当前剩余', count, '个代理')            for i in range(0, count, BATCH_TEST_SIZE):                start = i                stop = min(i + BATCH_TEST_SIZE, count)                print('正在测试第', start + 1, '-', stop, '个代理')                test_proxies = self.redis.batch(start, stop)                loop = asyncio.get_event_loop()                tasks = [self.test_single_proxy(proxy) for proxy in test_proxies]                loop.run_until_complete(asyncio.wait(tasks))                time.sleep(1)        except Exception as e:            print('测试器发生错误', e.args)

    接口模块api.py

    from flask import Flask, gfrom db import RedisClient__all__ = ['app']app = Flask(__name__)def get_conn():    if not hasattr(g, 'redis'):        g.redis = RedisClient()    return g.redis@app.route('/')def index():    return '

    Welcome to Proxy Pool System

    '@app.route('/random')def get_proxy(): """ Get a proxy :return: 随机代理 """ conn = get_conn() return conn.random()@app.route('/count')def get_counts(): """ Get the count of proxies :return: 代理池总量 """ conn = get_conn() return str(conn.count())if __name__ == '__main__': app.run()

    配置文件conf

    # Redis数据库地址REDIS_HOST = '127.0.0.1'# Redis端口REDIS_PORT = 6379# Redis密码,如无填NoneREDIS_PASSWORD = NoneREDIS_KEY = 'proxies'# 代理分数MAX_SCORE = 100MIN_SCORE = 0INITIAL_SCORE = 10VALID_STATUS_CODES = [200, 302]# 代理池数量界限POOL_UPPER_THRESHOLD = 50000# 检查周期TESTER_CYCLE = 20# 获取周期GETTER_CYCLE = 300# 测试API,建议抓哪个网站测哪个TEST_URL = 'http://www.baidu.com'# API配置API_HOST = '0.0.0.0'API_PORT = 5555# 开关TESTER_ENABLED = TrueGETTER_ENABLED = TrueAPI_ENABLED = True# 最大批测试量BATCH_TEST_SIZE = 100

    程序入口run.py  多线程运行获取、检测、接口模块

    import sysimport ioimport timefrom multiprocessing import Processfrom api import appfrom crawler import GetProxyfrom tester import Testerfrom conf import *sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')class Scheduler():    def schedule_tester(self, cycle=TESTER_CYCLE):        """        定时测试代理        """        tester = Tester()        while True:            print('测试器开始运行')            tester.run()            time.sleep(cycle)    def schedule_getter(self, cycle=GETTER_CYCLE):        """        定时获取代理        """        get_proxy = GetProxy()        while True:            print('开始抓取代理')            get_proxy.run()            time.sleep(cycle)    def schedule_api(self):        """        开启API        """        app.run(API_HOST, API_PORT)    def run(self):        print('代理池开始运行')        if TESTER_ENABLED:            tester_process = Process(target=self.schedule_tester)            tester_process.start()        if GETTER_ENABLED:            getter_process = Process(target=self.schedule_getter)            getter_process.start()        if API_ENABLED:            api_process = Process(target=self.schedule_api)            api_process.start()def main():    try:        s = Scheduler()        s.run()    except:        main()if __name__ == '__main__':    main()

     

    转载于:https://www.cnblogs.com/jp-mao/p/10106085.html

    你可能感兴趣的文章
    WCF开发实战系列二:使用IIS发布WCF服务
    查看>>
    Overload和Override的区别。Overloaded的方法是否可以改变返回值的类型?
    查看>>
    从性能角度分析一下String,List,Map
    查看>>
    转载:使用sklearn进行数据挖掘
    查看>>
    第四章 Apk包测试用例编写(上)
    查看>>
    微信小程序wepy开发,$apply()不能更新页面数据的情况
    查看>>
    移动web端在线观看ppt
    查看>>
    02-vue学习篇-以正确的姿势使用vue
    查看>>
    第一个Azure应用
    查看>>
    Java 读写锁的实现
    查看>>
    分享、收藏、打印页面操作
    查看>>
    Vim 编辑器
    查看>>
    js跳转页面方法大全
    查看>>
    别名节点aliases
    查看>>
    BZOJ-10-1176: [Balkan2007]Mokia-CDQ第二类应用
    查看>>
    [C++]线性链表之顺序表<一>
    查看>>
    操作系统学习
    查看>>
    常用free文献数据库
    查看>>
    题目2
    查看>>
    js创建对象的方式 三种
    查看>>