112 lines
3.5 KiB
Python
112 lines
3.5 KiB
Python
# -*- encoding:utf-8 -*-
|
||
|
||
'''
|
||
@Author : dingjiawen
|
||
@Date : 2023/12/7 15:58
|
||
@Usage : 使用Selenium实战爬取 https://spa2.scrape.center/
|
||
@Desc : 该网站爬取详情页时存在一个token,这个token的实现逻辑可能不确定,并且随事件发生变化,
|
||
因此需要使用Selenium模拟浏览器操作跳过这段逻辑
|
||
'''
|
||
from selenium import webdriver
|
||
from selenium.common.exceptions import TimeoutException
|
||
from selenium.webdriver.support import expected_conditions as EC
|
||
from selenium.webdriver.common.by import By
|
||
from selenium.webdriver.support.wait import WebDriverWait
|
||
from os import makedirs
|
||
from os.path import exists
|
||
import logging
|
||
from urllib.parse import urljoin
|
||
import json
|
||
|
||
logging.basicConfig(level=logging.INFO,
|
||
format='%(asctime)s - %(levelname)s: %(message)s')
|
||
|
||
INDEX_URL = 'https://spa2.scrape.center/page/{page}'
|
||
Timeout = 10
|
||
Total_page = 10
|
||
RESULTS_DIR = 'result'
|
||
|
||
exists(RESULTS_DIR) or makedirs(RESULTS_DIR)
|
||
|
||
# 防止有一些网站设置反屏蔽手段
|
||
options = webdriver.ChromeOptions()
|
||
options.add_experimental_option('excludeSwitches', ['enable-automation'])
|
||
options.add_experimental_option('useAutomationExtension', False)
|
||
|
||
# 显示设置超时时间
|
||
browser = webdriver.Chrome(options=options)
|
||
wait = WebDriverWait(browser, Timeout)
|
||
|
||
|
||
# 爬取网页
|
||
def scrape_page(url, condition, locator):
|
||
logging.info('scraping %s', url)
|
||
try:
|
||
browser.get(url)
|
||
# 设置等待
|
||
wait.until(condition(locator))
|
||
except TimeoutException:
|
||
logging.error('error occurred while scraping %s', url, exc_info=True)
|
||
|
||
|
||
def scrape_index(page):
|
||
url = INDEX_URL.format(page=page)
|
||
# 设置等待条件为当所有的index下面的子item都出来之后
|
||
scrape_page(url, EC.visibility_of_all_elements_located, locator=(By.CSS_SELECTOR, '#index .item'))
|
||
|
||
|
||
def parse_index():
|
||
titles = browser.find_elements(By.CSS_SELECTOR, '#index .item .name')
|
||
for title in titles:
|
||
href = title.get_attribute("href")
|
||
yield urljoin(INDEX_URL, href)
|
||
|
||
|
||
def scrape_detail(url):
|
||
return scrape_page(url, EC.visibility_of_element_located, (By.TAG_NAME, 'h2'))
|
||
|
||
|
||
def parse_detail():
|
||
url = browser.current_url
|
||
name = browser.find_element(By.TAG_NAME, 'h2').text
|
||
category = [element.text for element in browser.find_elements(By.CSS_SELECTOR, '.categories button span')]
|
||
cover = browser.find_element(By.CLASS_NAME, 'cover').get_attribute("src")
|
||
score = browser.find_element(By.CLASS_NAME, 'score').text
|
||
drama = browser.find_element(By.CSS_SELECTOR, '.drama p').text
|
||
return {
|
||
"url": url,
|
||
"name": name,
|
||
"category": category,
|
||
"cover": cover,
|
||
"score": score,
|
||
"drama": drama
|
||
}
|
||
|
||
|
||
def save_data(data):
|
||
name = data.get('name')
|
||
data_path = f'{RESULTS_DIR}/{name}.json'
|
||
json.dump(data, open(data_path, 'w', encoding='utf-8'), ensure_ascii=False, indent=2)
|
||
|
||
def main():
|
||
try:
|
||
|
||
for page in range(1, Total_page + 1):
|
||
scrape_index(page)
|
||
# 页面加载完毕之后,获取对应的url
|
||
detail_urls=list(parse_index())
|
||
# logging.info('detail data %s', list(detail_urls))
|
||
# 遍历所有的detail_urls,获取详情页信息
|
||
for detail_url in detail_urls:
|
||
scrape_detail(detail_url)
|
||
detail_info = parse_detail()
|
||
logging.info('detail info %s', detail_info)
|
||
save_data(detail_info)
|
||
|
||
finally:
|
||
browser.close()
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|