self_example/Spider/chapter02_爬虫基本库/Practice.py

165 lines
5.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- encoding:utf-8 -*-
'''
@Author : dingjiawen
@Date : 2023/11/7 19:09
@Usage :
@Desc : 一个基本的练习:爬取 https://ssr1.scrape.center 电影描述以及detail等
'''
import requests
import logging
import re
from urllib.parse import urljoin
from os import makedirs
from os.path import exists
from pyquery import PyQuery as pq
import json
# 输出的日志级别
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s: %(message)s')
BASE_URL = 'https://ssr1.scrape.center'
TOTAL_PAGE = 10
RESULTS_DIR = 'results'
exists(RESULTS_DIR) or makedirs(RESULTS_DIR)
def scrape_page(url):
"""
scrape page by url and return its html
:param url: page url
:return: html of page
"""
logging.info('scraping %s...', url)
try:
response = requests.get(url)
if response.status_code == 200:
return response.text
logging.error('get invalid status code %s while scraping %s', response.status_code, url)
except requests.RequestException:
logging.error('error occurred while scraping %s', url, exc_info=True)
def scrape_index(page):
index_url = f'{BASE_URL}/page/{page}'
return scrape_page(index_url)
# 从页面HTML中获取详情页坐标
def parse_index(html):
pattern = '<a.*?href="(.*?)".*?class="name">'
items = re.findall(pattern, html, re.S)
if not items:
return []
for item in items:
detail_url = urljoin(BASE_URL, item)
logging.info('get detail url %s', detail_url)
yield detail_url
def scrape_detail(url):
return scrape_page(url)
def parse_detail(html):
cover_pattern = re.compile(
'class="item.*?<img.*?src="(.*?)".*?class="cover">', re.S)
name_pattern = re.compile('<h2.*?>(.*?)</h2>')
categories_pattern = re.compile(
'<button.*?category.*?<span>(.*?)</span>.*?</button>', re.S)
published_at_pattern = re.compile('(\d{4}-\d{2}-\d{2})\s?上映')
drama_pattern = re.compile('<div.*?drama.*?>.*?<p.*?>(.*?)</p>', re.S)
score_pattern = re.compile('<p.*?score.*?>(.*?)</p>', re.S)
cover = re.search(cover_pattern, html).group(
1).strip() if re.search(cover_pattern, html) else None
name = re.search(name_pattern, html).group(
1).strip() if re.search(name_pattern, html) else None
categories = re.findall(categories_pattern, html) if re.findall(
categories_pattern, html) else []
published_at = re.search(published_at_pattern, html).group(
1) if re.search(published_at_pattern, html) else None
drama = re.search(drama_pattern, html).group(
1).strip() if re.search(drama_pattern, html) else None
score = float(re.search(score_pattern, html).group(1).strip()
) if re.search(score_pattern, html) else None
return {
'cover': cover,
'name': name,
'categories': categories,
'published_at': published_at,
'drama': drama,
'score': score
}
def parse_detailByPyQuery(html):
doc = pq(html)
cover = doc('img.cover').attr('src')
name = doc('a > h2').text()
categories = [item.text() for item in doc('.categories button span').items()]
published_at = doc('.info:contains(上映)').text()
published_at = re.search('(\d{4}-\d{2}-\d{2})', published_at).group(1) \
if published_at and re.search('\d{4}-\d{2}-\d{2}', published_at) else None
drama = doc('.drama p').text()
score = doc('p.score').text()
score = float(score) if score else None
return {
'cover': cover,
'name': name,
'categories': categories,
'published_at': published_at,
'drama': drama,
'score': score
}
def save_data(data):
"""
save to json file
:param data:
:return:
"""
name = data.get('name')
data_path = f'{RESULTS_DIR}/{name}.json'
# ensure_ascii值为False,可以保证中文字符在文件内能以正常的正文文本呈现而不是unicode
# indent为2,可以使json可以两行缩进
json.dump(data, open(data_path, 'w', encoding='utf-8'),
ensure_ascii=False, indent=2)
def main():
for page in range(1, TOTAL_PAGE):
index_html = scrape_index(page)
detail_urls = parse_index(index_html)
for detail_url in detail_urls:
data = parse_detail(scrape_detail(detail_url))
logging.info("get detail data %s", data)
save_data(data)
logging.info("save data successfully")
def mainByMulti(page):
index_html = scrape_index(page)
detail_urls = parse_index(index_html)
for detail_url in detail_urls:
data = parse_detail(scrape_detail(detail_url))
logging.info("get detail data %s", data)
save_data(data)
logging.info("save data successfully")
if __name__ == '__main__':
# 单进程
# main()
# 多进程
import multiprocessing
pool = multiprocessing.Pool()
pages = range(1, TOTAL_PAGE)
pool.map(mainByMulti, pages)
pool.close()
pool.join()