# -*- encoding:utf-8 -*- ''' @Author : dingjiawen @Date : 2023/11/7 19:09 @Usage : @Desc : 一个基本的练习:爬取 https://ssr1.scrape.center 电影描述以及detail等 ''' import requests import logging import re from urllib.parse import urljoin from os import makedirs from os.path import exists from pyquery import PyQuery as pq import json # 输出的日志级别 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s') BASE_URL = 'https://ssr1.scrape.center' TOTAL_PAGE = 10 RESULTS_DIR = 'results' exists(RESULTS_DIR) or makedirs(RESULTS_DIR) def scrape_page(url): """ scrape page by url and return its html :param url: page url :return: html of page """ logging.info('scraping %s...', url) try: response = requests.get(url) if response.status_code == 200: return response.text logging.error('get invalid status codes %s while scraping %s', response.status_code, url) except requests.RequestException: logging.error('error occurred while scraping %s', url, exc_info=True) def scrape_index(page): index_url = f'{BASE_URL}/page/{page}' return scrape_page(index_url) # 从页面HTML中获取详情页坐标 def parse_index(html): pattern = '' items = re.findall(pattern, html, re.S) if not items: return [] for item in items: detail_url = urljoin(BASE_URL, item) logging.info('get detail url %s', detail_url) yield detail_url def scrape_detail(url): return scrape_page(url) def parse_detail(html): cover_pattern = re.compile( 'class="item.*?', re.S) name_pattern = re.compile('(.*?)') categories_pattern = re.compile( '(.*?).*?', re.S) published_at_pattern = re.compile('(\d{4}-\d{2}-\d{2})\s?上映') drama_pattern = re.compile('.*?(.*?)

', re.S) score_pattern = re.compile('(.*?)

', re.S) cover = re.search(cover_pattern, html).group( 1).strip() if re.search(cover_pattern, html) else None name = re.search(name_pattern, html).group( 1).strip() if re.search(name_pattern, html) else None categories = re.findall(categories_pattern, html) if re.findall( categories_pattern, html) else [] published_at = re.search(published_at_pattern, html).group( 1) if re.search(published_at_pattern, html) else None drama = re.search(drama_pattern, html).group( 1).strip() if re.search(drama_pattern, html) else None score = float(re.search(score_pattern, html).group(1).strip() ) if re.search(score_pattern, html) else None return { 'cover': cover, 'name': name, 'categories': categories, 'published_at': published_at, 'drama': drama, 'score': score } def parse_detailByPyQuery(html): doc = pq(html) cover = doc('img.cover').attr('src') name = doc('a > h2').text() categories = [item.text() for item in doc('.categories button span').items()] published_at = doc('.info:contains(上映)').text() published_at = re.search('(\d{4}-\d{2}-\d{2})', published_at).group(1) \ if published_at and re.search('\d{4}-\d{2}-\d{2}', published_at) else None drama = doc('.drama p').text() score = doc('p.score').text() score = float(score) if score else None return { 'cover': cover, 'name': name, 'categories': categories, 'published_at': published_at, 'drama': drama, 'score': score } def save_data(data): """ save to json file :param data: :return: """ name = data.get('name') data_path = f'{RESULTS_DIR}/{name}.json' # ensure_ascii值为False,可以保证中文字符在文件内能以正常的正文文本呈现,而不是unicode # indent为2,可以使json可以两行缩进 json.dump(data, open(data_path, 'w', encoding='utf-8'), ensure_ascii=False, indent=2) def main(): for page in range(1, TOTAL_PAGE): index_html = scrape_index(page) detail_urls = parse_index(index_html) for detail_url in detail_urls: data = parse_detail(scrape_detail(detail_url)) logging.info("get detail data %s", data) save_data(data) logging.info("save data successfully") def mainByMulti(page): index_html = scrape_index(page) detail_urls = parse_index(index_html) for detail_url in detail_urls: data = parse_detail(scrape_detail(detail_url)) logging.info("get detail data %s", data) save_data(data) logging.info("save data successfully") if __name__ == '__main__': # 单进程 # main() # 多进程 import multiprocessing pool = multiprocessing.Pool() pages = range(1, TOTAL_PAGE) pool.map(mainByMulti, pages) pool.close() pool.join()