博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
简易多线程爬虫框架
阅读量:4640 次
发布时间:2019-06-09

本文共 8848 字,大约阅读时间需要 29 分钟。

本文首发于

本文使用多线程实现一个简易爬虫框架,让我们只需要关注网页的解析,不用自己设置多线程、队列等事情。调用形式类似scrapy,而诸多功能还不完善,因此称为简易爬虫框架。

这个框架实现了Spider类,让我们只需要写出下面代码,即可多线程运行爬虫

class DouBan(Spider):    def __init__(self):        super(DouBan, self).__init__()        self.start_url = 'https://movie.douban.com/top250'        self.filename = 'douban.json' # 覆盖默认值        self.output_result = False         self.thread_num = 10    def start_requests(self): # 覆盖默认函数        yield (self.start_url, self.parse_first)    def parse_first(self, url): # 只需要yield待爬url和回调函数        r = requests.get(url)        soup = BeautifulSoup(r.content, 'lxml')        movies = soup.find_all('div', class_ = 'info')[:5]        for movie in movies:            url = movie.find('div', class_ = 'hd').a['href']            yield (url, self.parse_second)        nextpage = soup.find('span', class_ = 'next').a        if nextpage:            nexturl = self.start_url + nextpage['href']            yield (nexturl, self.parse_first)        else:            self.running = False # 表明运行到这里则不会继续添加待爬URL队列    def parse_second(self, url):        r = requests.get(url)        soup = BeautifulSoup(r.content, 'lxml')        mydict = {}        title = soup.find('span', property = 'v:itemreviewed')        mydict['title'] = title.text if title else None        duration = soup.find('span', property = 'v:runtime')        mydict['duration'] = duration.text if duration else None        time = soup.find('span', property = 'v:initialReleaseDate')        mydict['time'] = time.text if time else None        yield mydictif __name__ == '__main__':    douban = DouBan()    douban.run()复制代码

可以看到这个使用方式和scrapy非常相似

  • 继承类,只需要写解析函数(因为是简易框架,因此还需要写请求函数)
  • 用yield返回数据或者新的请求及回调函数
  • 自动多线程(scrapy是异步)
  • 运行都一样只要run
  • 可以设置是否存储到文件等,只是没有考虑可扩展性(数据库等)

下面我们来说一说它是怎么实现的

我们可以对比下面两个版本,一个是中的使用方法,另一个是进行了一些修改,将一些功能抽象出来,以便扩展功能。

上一篇文章版本代码请读者自行点击链接去看,下面是修改后的版本代码。

import requestsimport timeimport threadingfrom queue import Queue, Emptyimport jsonfrom bs4 import BeautifulSoupdef run_time(func):    def wrapper(*args, **kw):        start = time.time()        func(*args, **kw)        end = time.time()        print('running', end-start, 's')    return wrapperclass Spider():    def __init__(self):        self.start_url = 'https://movie.douban.com/top250'        self.qtasks = Queue()        self.data = list()        self.thread_num = 5        self.running = True    def start_requests(self):        yield (self.start_url, self.parse_first)    def parse_first(self, url):        r = requests.get(url)        soup = BeautifulSoup(r.content, 'lxml')        movies = soup.find_all('div', class_ = 'info')[:5]        for movie in movies:            url = movie.find('div', class_ = 'hd').a['href']            yield (url, self.parse_second)        nextpage = soup.find('span', class_ = 'next').a        if nextpage:            nexturl = self.start_url + nextpage['href']            yield (nexturl, self.parse_first)        else:            self.running = False    def parse_second(self, url):        r = requests.get(url)        soup = BeautifulSoup(r.content, 'lxml')        mydict = {}        title = soup.find('span', property = 'v:itemreviewed')        mydict['title'] = title.text if title else None        duration = soup.find('span', property = 'v:runtime')        mydict['duration'] = duration.text if duration else None        time = soup.find('span', property = 'v:initialReleaseDate')        mydict['time'] = time.text if time else None        yield mydict    def start_req(self):        for task in self.start_requests():            self.qtasks.put(task)    def parses(self):        while self.running or not self.qtasks.empty():            try:                url, func = self.qtasks.get(timeout=3)                print('crawling', url)                for task in func(url):                    if isinstance(task, tuple):                        self.qtasks.put(task)                    elif isinstance(task, dict):                        self.data.append(task)                    else:                        raise TypeError('parse functions have to yield url-function tuple or data dict')            except Empty:                print('{}: Timeout occurred'.format(threading.current_thread().name))        print(threading.current_thread().name, 'finished')    @run_time    def run(self, filename=False):        ths = []        th1 = threading.Thread(target=self.start_req)        th1.start()        ths.append(th1)        for _ in range(self.thread_num):            th = threading.Thread(target=self.parses)            th.start()            ths.append(th)        for th in ths:            th.join()        if filename:            s = json.dumps(self.data, ensure_ascii=False, indent=4)            with open(filename, 'w', encoding='utf-8') as f:                f.write(s)        print('Data crawling is finished.')if __name__ == '__main__':    Spider().run(filename='frame.json')复制代码

这个改进主要思路如下

  • 我们希望写解析函数时,像scrapy一样,用yield返回待抓取的URL和它对应的解析函数,于是就做了一个包含(URL,解析函数)的元组队列,之后只要不断从队列中获取元素,用函数解析url即可,这个提取的过程使用多线程
  • yield可以返回两种类型数据,一种是元组(URL,解析函数),一种是字典(即我们要的数据),通过判断分别加入不同队列中。元组队列是不断消耗和增添的过程,而字典队列是一只增加,最后再一起输出到文件中
  • queue.get时,加入了timeout参数并做异常处理,保证每一个线程都能结束

这里其实没有特别的知识,也不需要解释很多,读者自己复制代码到文本文件里对比就知道了

然后框架的形式就是从第二种中,剥离一些通用的设定,让用户自定义每个爬虫独特的部分,完整代码如下(本文开头的代码就是下面这块代码的后半部分)

import requestsimport timeimport threadingfrom queue import Queue, Emptyimport jsonfrom bs4 import BeautifulSoupdef run_time(func):    def wrapper(*args, **kw):        start = time.time()        func(*args, **kw)        end = time.time()        print('running', end-start, 's')    return wrapperclass Spider():    def __init__(self):        self.qtasks = Queue()        self.data = list()        self.thread_num = 5        self.running = True        self.filename = False        self.output_result = True    def start_requests(self):        yield (self.start_url, self.parse)    def start_req(self):        for task in self.start_requests():            self.qtasks.put(task)    def parses(self):        while self.running or not self.qtasks.empty():            try:                url, func = self.qtasks.get(timeout=3)                print('crawling', url)                for task in func(url):                    if isinstance(task, tuple):                        self.qtasks.put(task)                    elif isinstance(task, dict):                        if self.output_result:                            print(task)                        self.data.append(task)                    else:                        raise TypeError('parse functions have to yield url-function tuple or data dict')            except Empty:                print('{}: Timeout occurred'.format(threading.current_thread().name))        print(threading.current_thread().name, 'finished')    @run_time    def run(self):        ths = []        th1 = threading.Thread(target=self.start_req)        th1.start()        ths.append(th1)        for _ in range(self.thread_num):            th = threading.Thread(target=self.parses)            th.start()            ths.append(th)        for th in ths:            th.join()        if self.filename:            s = json.dumps(self.data, ensure_ascii=False, indent=4)            with open(self.filename, 'w', encoding='utf-8') as f:                f.write(s)        print('Data crawling is finished.')class DouBan(Spider):    def __init__(self):        super(DouBan, self).__init__()        self.start_url = 'https://movie.douban.com/top250'        self.filename = 'douban.json' # 覆盖默认值        self.output_result = False         self.thread_num = 10    def start_requests(self): # 覆盖默认函数        yield (self.start_url, self.parse_first)    def parse_first(self, url): # 只需要yield待爬url和回调函数        r = requests.get(url)        soup = BeautifulSoup(r.content, 'lxml')        movies = soup.find_all('div', class_ = 'info')[:5]        for movie in movies:            url = movie.find('div', class_ = 'hd').a['href']            yield (url, self.parse_second)        nextpage = soup.find('span', class_ = 'next').a        if nextpage:            nexturl = self.start_url + nextpage['href']            yield (nexturl, self.parse_first)        else:            self.running = False # 表明运行到这里则不会继续添加待爬URL队列    def parse_second(self, url):        r = requests.get(url)        soup = BeautifulSoup(r.content, 'lxml')        mydict = {}        title = soup.find('span', property = 'v:itemreviewed')        mydict['title'] = title.text if title else None        duration = soup.find('span', property = 'v:runtime')        mydict['duration'] = duration.text if duration else None        time = soup.find('span', property = 'v:initialReleaseDate')        mydict['time'] = time.text if time else None        yield mydictif __name__ == '__main__':    douban = DouBan()    douban.run()复制代码

我们这样剥离之后,就只需要写后半部分的代码,只关心网页的解析,不用考虑多线程的实现了。

欢迎关注我的知乎专栏

专栏主页:

专栏目录:

版本说明:

转载于:https://juejin.im/post/5b129bd7e51d4506a74d22f4

你可能感兴趣的文章
Python的数据库操作(Sqlalchemy)
查看>>
2.抽取代码(BaseActivity)
查看>>
My simplified pickit2 clone
查看>>
Redis 入门知识
查看>>
夏天过去了, 姥爷推荐几套来自smashingmagzine的超棒秋天主题壁纸
查看>>
转--Android如何在java代码中设置margin
查看>>
反射的所有api
查看>>
上传文件
查看>>
css 定位及遮罩层小技巧
查看>>
用java向mysql数据库中插入数据为空
查看>>
项目中非常有用并且常见的ES6语法
查看>>
mac 端口转发方案
查看>>
[2017.02.23] Java8 函数式编程
查看>>
Knowledge Point 20180305 数据在计算机中的表示
查看>>
谈谈对web标准的理解
查看>>
求二进制中1的个数(编程之美2.1)
查看>>
58前端内推笔试2017(含答案)
查看>>
Java学习笔记
查看>>
sprintf 和strcpy 的差别
查看>>
打表打表何谓打表?
查看>>