本文首发于
本文使用多线程实现一个简易爬虫框架,让我们只需要关注网页的解析,不用自己设置多线程、队列等事情。调用形式类似scrapy,而诸多功能还不完善,因此称为简易爬虫框架。
这个框架实现了Spider
类,让我们只需要写出下面代码,即可多线程运行爬虫
class DouBan(Spider): def __init__(self): super(DouBan, self).__init__() self.start_url = 'https://movie.douban.com/top250' self.filename = 'douban.json' # 覆盖默认值 self.output_result = False self.thread_num = 10 def start_requests(self): # 覆盖默认函数 yield (self.start_url, self.parse_first) def parse_first(self, url): # 只需要yield待爬url和回调函数 r = requests.get(url) soup = BeautifulSoup(r.content, 'lxml') movies = soup.find_all('div', class_ = 'info')[:5] for movie in movies: url = movie.find('div', class_ = 'hd').a['href'] yield (url, self.parse_second) nextpage = soup.find('span', class_ = 'next').a if nextpage: nexturl = self.start_url + nextpage['href'] yield (nexturl, self.parse_first) else: self.running = False # 表明运行到这里则不会继续添加待爬URL队列 def parse_second(self, url): r = requests.get(url) soup = BeautifulSoup(r.content, 'lxml') mydict = {} title = soup.find('span', property = 'v:itemreviewed') mydict['title'] = title.text if title else None duration = soup.find('span', property = 'v:runtime') mydict['duration'] = duration.text if duration else None time = soup.find('span', property = 'v:initialReleaseDate') mydict['time'] = time.text if time else None yield mydictif __name__ == '__main__': douban = DouBan() douban.run()复制代码
可以看到这个使用方式和scrapy非常相似
- 继承类,只需要写解析函数(因为是简易框架,因此还需要写请求函数)
- 用yield返回数据或者新的请求及回调函数
- 自动多线程(scrapy是异步)
- 运行都一样只要
run
- 可以设置是否存储到文件等,只是没有考虑可扩展性(数据库等)
下面我们来说一说它是怎么实现的
我们可以对比下面两个版本,一个是中的使用方法,另一个是进行了一些修改,将一些功能抽象出来,以便扩展功能。
上一篇文章版本代码请读者自行点击链接去看,下面是修改后的版本代码。
import requestsimport timeimport threadingfrom queue import Queue, Emptyimport jsonfrom bs4 import BeautifulSoupdef run_time(func): def wrapper(*args, **kw): start = time.time() func(*args, **kw) end = time.time() print('running', end-start, 's') return wrapperclass Spider(): def __init__(self): self.start_url = 'https://movie.douban.com/top250' self.qtasks = Queue() self.data = list() self.thread_num = 5 self.running = True def start_requests(self): yield (self.start_url, self.parse_first) def parse_first(self, url): r = requests.get(url) soup = BeautifulSoup(r.content, 'lxml') movies = soup.find_all('div', class_ = 'info')[:5] for movie in movies: url = movie.find('div', class_ = 'hd').a['href'] yield (url, self.parse_second) nextpage = soup.find('span', class_ = 'next').a if nextpage: nexturl = self.start_url + nextpage['href'] yield (nexturl, self.parse_first) else: self.running = False def parse_second(self, url): r = requests.get(url) soup = BeautifulSoup(r.content, 'lxml') mydict = {} title = soup.find('span', property = 'v:itemreviewed') mydict['title'] = title.text if title else None duration = soup.find('span', property = 'v:runtime') mydict['duration'] = duration.text if duration else None time = soup.find('span', property = 'v:initialReleaseDate') mydict['time'] = time.text if time else None yield mydict def start_req(self): for task in self.start_requests(): self.qtasks.put(task) def parses(self): while self.running or not self.qtasks.empty(): try: url, func = self.qtasks.get(timeout=3) print('crawling', url) for task in func(url): if isinstance(task, tuple): self.qtasks.put(task) elif isinstance(task, dict): self.data.append(task) else: raise TypeError('parse functions have to yield url-function tuple or data dict') except Empty: print('{}: Timeout occurred'.format(threading.current_thread().name)) print(threading.current_thread().name, 'finished') @run_time def run(self, filename=False): ths = [] th1 = threading.Thread(target=self.start_req) th1.start() ths.append(th1) for _ in range(self.thread_num): th = threading.Thread(target=self.parses) th.start() ths.append(th) for th in ths: th.join() if filename: s = json.dumps(self.data, ensure_ascii=False, indent=4) with open(filename, 'w', encoding='utf-8') as f: f.write(s) print('Data crawling is finished.')if __name__ == '__main__': Spider().run(filename='frame.json')复制代码
这个改进主要思路如下
- 我们希望写解析函数时,像scrapy一样,用yield返回待抓取的URL和它对应的解析函数,于是就做了一个包含(URL,解析函数)的元组队列,之后只要不断从队列中获取元素,用函数解析url即可,这个提取的过程使用多线程
yield
可以返回两种类型数据,一种是元组(URL,解析函数),一种是字典(即我们要的数据),通过判断分别加入不同队列中。元组队列是不断消耗和增添的过程,而字典队列是一只增加,最后再一起输出到文件中- 在
queue.get
时,加入了timeout
参数并做异常处理,保证每一个线程都能结束
这里其实没有特别的知识,也不需要解释很多,读者自己复制代码到文本文件里对比就知道了
然后框架的形式就是从第二种中,剥离一些通用的设定,让用户自定义每个爬虫独特的部分,完整代码如下(本文开头的代码就是下面这块代码的后半部分)
import requestsimport timeimport threadingfrom queue import Queue, Emptyimport jsonfrom bs4 import BeautifulSoupdef run_time(func): def wrapper(*args, **kw): start = time.time() func(*args, **kw) end = time.time() print('running', end-start, 's') return wrapperclass Spider(): def __init__(self): self.qtasks = Queue() self.data = list() self.thread_num = 5 self.running = True self.filename = False self.output_result = True def start_requests(self): yield (self.start_url, self.parse) def start_req(self): for task in self.start_requests(): self.qtasks.put(task) def parses(self): while self.running or not self.qtasks.empty(): try: url, func = self.qtasks.get(timeout=3) print('crawling', url) for task in func(url): if isinstance(task, tuple): self.qtasks.put(task) elif isinstance(task, dict): if self.output_result: print(task) self.data.append(task) else: raise TypeError('parse functions have to yield url-function tuple or data dict') except Empty: print('{}: Timeout occurred'.format(threading.current_thread().name)) print(threading.current_thread().name, 'finished') @run_time def run(self): ths = [] th1 = threading.Thread(target=self.start_req) th1.start() ths.append(th1) for _ in range(self.thread_num): th = threading.Thread(target=self.parses) th.start() ths.append(th) for th in ths: th.join() if self.filename: s = json.dumps(self.data, ensure_ascii=False, indent=4) with open(self.filename, 'w', encoding='utf-8') as f: f.write(s) print('Data crawling is finished.')class DouBan(Spider): def __init__(self): super(DouBan, self).__init__() self.start_url = 'https://movie.douban.com/top250' self.filename = 'douban.json' # 覆盖默认值 self.output_result = False self.thread_num = 10 def start_requests(self): # 覆盖默认函数 yield (self.start_url, self.parse_first) def parse_first(self, url): # 只需要yield待爬url和回调函数 r = requests.get(url) soup = BeautifulSoup(r.content, 'lxml') movies = soup.find_all('div', class_ = 'info')[:5] for movie in movies: url = movie.find('div', class_ = 'hd').a['href'] yield (url, self.parse_second) nextpage = soup.find('span', class_ = 'next').a if nextpage: nexturl = self.start_url + nextpage['href'] yield (nexturl, self.parse_first) else: self.running = False # 表明运行到这里则不会继续添加待爬URL队列 def parse_second(self, url): r = requests.get(url) soup = BeautifulSoup(r.content, 'lxml') mydict = {} title = soup.find('span', property = 'v:itemreviewed') mydict['title'] = title.text if title else None duration = soup.find('span', property = 'v:runtime') mydict['duration'] = duration.text if duration else None time = soup.find('span', property = 'v:initialReleaseDate') mydict['time'] = time.text if time else None yield mydictif __name__ == '__main__': douban = DouBan() douban.run()复制代码
我们这样剥离之后,就只需要写后半部分的代码,只关心网页的解析,不用考虑多线程的实现了。
欢迎关注我的知乎专栏
专栏主页:
专栏目录:
版本说明: