# -*- encoding:utf-8 -*-
'''
@Author : dingjiawen
@Date : 2023/11/7 19:09
@Usage :
@Desc : 一个基本的练习:爬取 https://ssr1.scrape.center 电影描述以及detail等
'''
import requests
import logging
import re
from urllib.parse import urljoin
from os import makedirs
from os.path import exists
from pyquery import PyQuery as pq
import json
# 输出的日志级别
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s: %(message)s')
BASE_URL = 'https://ssr1.scrape.center'
TOTAL_PAGE = 10
RESULTS_DIR = 'results'
exists(RESULTS_DIR) or makedirs(RESULTS_DIR)
def scrape_page(url):
"""
scrape page by url and return its html
:param url: page url
:return: html of page
"""
logging.info('scraping %s...', url)
try:
response = requests.get(url)
if response.status_code == 200:
return response.text
logging.error('get invalid status codes %s while scraping %s', response.status_code, url)
except requests.RequestException:
logging.error('error occurred while scraping %s', url, exc_info=True)
def scrape_index(page):
index_url = f'{BASE_URL}/page/{page}'
return scrape_page(index_url)
# 从页面HTML中获取详情页坐标
def parse_index(html):
pattern = ''
items = re.findall(pattern, html, re.S)
if not items:
return []
for item in items:
detail_url = urljoin(BASE_URL, item)
logging.info('get detail url %s', detail_url)
yield detail_url
def scrape_detail(url):
return scrape_page(url)
def parse_detail(html):
cover_pattern = re.compile(
'class="item.*?', re.S)
name_pattern = re.compile('(.*?)')
categories_pattern = re.compile(
'(.*?).*?', re.S)
published_at_pattern = re.compile('(\d{4}-\d{2}-\d{2})\s?上映')
drama_pattern = re.compile('.*?(.*?)
', re.S)
score_pattern = re.compile('(.*?)', re.S)
cover = re.search(cover_pattern, html).group(
1).strip() if re.search(cover_pattern, html) else None
name = re.search(name_pattern, html).group(
1).strip() if re.search(name_pattern, html) else None
categories = re.findall(categories_pattern, html) if re.findall(
categories_pattern, html) else []
published_at = re.search(published_at_pattern, html).group(
1) if re.search(published_at_pattern, html) else None
drama = re.search(drama_pattern, html).group(
1).strip() if re.search(drama_pattern, html) else None
score = float(re.search(score_pattern, html).group(1).strip()
) if re.search(score_pattern, html) else None
return {
'cover': cover,
'name': name,
'categories': categories,
'published_at': published_at,
'drama': drama,
'score': score
}
def parse_detailByPyQuery(html):
doc = pq(html)
cover = doc('img.cover').attr('src')
name = doc('a > h2').text()
categories = [item.text() for item in doc('.categories button span').items()]
published_at = doc('.info:contains(上映)').text()
published_at = re.search('(\d{4}-\d{2}-\d{2})', published_at).group(1) \
if published_at and re.search('\d{4}-\d{2}-\d{2}', published_at) else None
drama = doc('.drama p').text()
score = doc('p.score').text()
score = float(score) if score else None
return {
'cover': cover,
'name': name,
'categories': categories,
'published_at': published_at,
'drama': drama,
'score': score
}
def save_data(data):
"""
save to json file
:param data:
:return:
"""
name = data.get('name')
data_path = f'{RESULTS_DIR}/{name}.json'
# ensure_ascii值为False,可以保证中文字符在文件内能以正常的正文文本呈现,而不是unicode
# indent为2,可以使json可以两行缩进
json.dump(data, open(data_path, 'w', encoding='utf-8'),
ensure_ascii=False, indent=2)
def main():
for page in range(1, TOTAL_PAGE):
index_html = scrape_index(page)
detail_urls = parse_index(index_html)
for detail_url in detail_urls:
data = parse_detail(scrape_detail(detail_url))
logging.info("get detail data %s", data)
save_data(data)
logging.info("save data successfully")
def mainByMulti(page):
index_html = scrape_index(page)
detail_urls = parse_index(index_html)
for detail_url in detail_urls:
data = parse_detail(scrape_detail(detail_url))
logging.info("get detail data %s", data)
save_data(data)
logging.info("save data successfully")
if __name__ == '__main__':
# 单进程
# main()
# 多进程
import multiprocessing
pool = multiprocessing.Pool()
pages = range(1, TOTAL_PAGE)
pool.map(mainByMulti, pages)
pool.close()
pool.join()