我有以下代码.它打开了无头浏览器,我还看到页面正在滚动,但parse方法中的响应对象没有任何HTML.当我不使用自动滚动时,这个蜘蛛可以完美地工作.

该代码仅用于从该网站提取产品名称和产品价格.

import scrapy
import re
from scrapy_playwright.page import PageMethod
from bs4 import BeautifulSoup


def should_abort_request(req):
   if req.resource_type == "image":
     return True
   if req.method.lower() == 'post':
     return True

return False


scrolling_script = """
  const scrolls = 8
  let scrollCount = 0

  // scroll down and then wait for 5s
  const scrollInterval = setInterval(() => {
    window.scrollTo(0, document.body.scrollHeight)
    scrollCount++

    if (scrollCount === numScrolls) {
      clearInterval(scrollInterval)
    }
  }, 5000)
  """


class AuchanSpider(scrapy.Spider):
  name = 'auchan'
  custom_settings = {
    'PLAYWRIGHT_ABORT_REQUEST': should_abort_request
  }
  start_urls = ['https://zakupy.auchan.pl/shop/list/8029?shType=id']

  def start_requests(self):
    for url in self.start_urls:
        yield scrapy.Request(
            url=url,
            callback=self.parse,
            meta={
                "playwright": True,
                "playwright_include_page": True,
                "playwright_page_methods": [
                    PageMethod("evaluate", scrolling_script),
                    #PageMethod("wait_for_timeout", 30000),
                    PageMethod("wait_for_selector", "._1E5b _2I59 _1wkJ _3YFw igxN _7Zx6 Eb4X _390_"),
                    PageMethod("wait_for_selector", "._1E5b _2I59 _1wkJ _3YFw igxN _7Zx6 Eb4X _390_:nth-child(60)")
                ],
            },
            errback=self.close_page,
            cb_kwargs=dict(main_url=url, page_number=0),
        )

async def parse(self, response, main_url, page_number):
    soup = BeautifulSoup(response.text, 'html.parser')
    product_containers = soup.find_all('div', class_='_1E5b _2I59 _1wkJ _3YFw igxN _7Zx6 Eb4X _390_')
    for product_container in product_containers:
        price = product_container.find(class_='_1-UB _1Evs').get_text()
        price = re.sub(r"[\n\t\s]*", "", price)
        yield {
            'productName': product_container.find(class_='_1DGZ').get_text(),
            'price': price
        }

async def close_page(self, failure):
    page = failure.request.meta["playwright_page"]
    await page.close()

推荐答案

我会比你更直接地处理这个问题.没有必要使用BeautifulSoup,因为编剧已经可以在实时页面上 Select 元素了.我也不确定是否有必要使用ScRapy,但如果您愿意,可以将以下编剧代码改编为Scrapy:

import re
from playwright.sync_api import sync_playwright  # 1.37.0
from time import sleep


with sync_playwright() as p:
    browser = p.chromium.launch(headless=False)
    page = browser.new_page()
    url = "https://zakupy.auchan.pl/shop/list/8029?shType=id"
    page.goto(url)
    page.click("#onetrust-accept-btn-handler")
    page.click("._3YI0")
    text = page.locator("._3MDH").text_content().strip()
    expected = int(re.search(r"\d+$", text).group())
    records = {}

    while len(records) < expected:
        page.keyboard.press("PageDown")
        sleep(0.2)  # save a bit of CPU
        items = page.eval_on_selector_all(
            "._1DGZ",
            """els => els.map(e => ({
              href: e.href,
              text: e.textContent,
            }))""",
        )

        for x in items:
            # assume hrefs are unique
            records[x["href"]] = x

    print(records)
    browser.close()

这段代码将删除cookie和广告横幅,然后按PageDown直到没有更多的记录可供提取.我只是从DOM中提取标题和链接,但如果需要,您可以添加更多信息.

请注意,我使用的是更简单的 Select 器. Select 器中的假设越多,如果有任何假设不成立,它就越有可能失败.在您的例子中,尽管问题是使用空格而不是.来标识一个元素上的多个类(空格表示祖先),但一开始不使用这么多类就可以避免混淆.首先在浏览器控制台中判断您的 Select 器是否正常,请记住,考虑到不同的环境,这并不能保证它们在PlayWriter中也能工作.浏览器可以生成样本 Select 器.尽管这些通常过于具体,但它们至少是有效的,并且可以改进以使其更可靠.

此外,我意识到使用页面底部的文本"Załadowano361 produkt(Y)na 361"来确定所有记录都已被抓取的时间可能会更好,但我将把它作为练习.


另一种方法是拦截请求,而不是抓取文档,这会提供更多的数据(对于所提供的页面,大约2 MB):

import json
from playwright.sync_api import sync_playwright
from time import sleep


def scrape(page):
    url = "https://zakupy.auchan.pl/shop/list/8029?shType=id"
    items = []
    done = False

    def handle_response(response):
        nonlocal done
        api_url = "https://zakupy.auchan.pl/api/v2/cache/products"

        if response.url.startswith(api_url):
            data = response.json()
            items.append(data)

            if data["pageCount"] == data["currentPage"]:
                with open("out.json", "w") as f:
                    json.dump(items, f)
                    done = True

    page.on("response", handle_response)
    page.goto(url)
    page.click("#onetrust-accept-btn-handler")
    page.click("._3YI0")

    while not done:
        page.keyboard.press("PageDown")
        sleep(0.2)  # save a bit of CPU


with sync_playwright() as p:
    browser = p.chromium.launch(headless=True)
    scrape(browser.new_page())
    browser.close()

然后,您可以使用jq遍历JSON,以提取您想要的任何信息,比如名称:

jq '.[].results | .[] | .defaultVariant.name' < out.json

或在Python中:

for x in items:
    for x in x["results"]:
        print(x["defaultVariant"]["name"])

使用列表比较:

[x["defaultVariant"]["name"] for y in items for x in y["results"]]

请注意,上面的版本遗漏了记录的第一页,可以从DOM中抓取,也可以使用从另一个API请求复制的头部进行单独的请求.

然而,一旦进入请求拦截领域,您就可以劫持对其API的请求,并将其连接到返回500个项目,从而更快速、更轻松地收集所有数据:

import json
from playwright.sync_api import sync_playwright
from time import sleep


def scrape(page):
    url = "https://zakupy.auchan.pl/shop/list/8029?shType=id"
    api_url = "https://zakupy.auchan.pl/api/v2/cache/products"
    new_url = "https://zakupy.auchan.pl/api/v2/cache/products?listId=8029&itemsPerPage=500&page=1&cacheSegmentationCode=019_DEF&hl=pl"
    done = False

    def handle(route, request):
        route.continue_(url=new_url)

    page.route("https://zakupy.auchan.pl/api/v2/cache/products*", handle)

    def handle_response(response):
        nonlocal done

        if response.url.startswith(api_url):
            with open("out1.json", "w") as f:
                json.dump(response.json(), f)
                done = True

    page.on("response", handle_response)
    page.goto(url)
    page.click("#onetrust-accept-btn-handler")
    page.click("._3YI0")

    while not done:
        page.keyboard.press("PageDown")
        sleep(0.2)  # save a bit of CPU


with sync_playwright() as p:
    browser = p.chromium.launch(headless=True)
    scrape(browser.new_page())
    browser.close()

使用抓取名称的示例,可以按如下方式处理此 struct :

jq '.results | .[] | .defaultVariant.name' < out1.json

或在Python中:

for x in data["results"]:
    print(x["defaultVariant"]["name"])

Python相关问答推荐

为什么图像结果翻转了90度?

使用子字符串动态更新Python DataFrame中的列

两极:滚动组,起始指数由不同列设置

从今天起的future 12个月内使用Python迭代

如何销毁框架并使其在tkinter中看起来像以前的样子?

有条件地采样我的大型DF的最有效方法

Pandas 填充条件是另一列

为什么我的Python代码在if-else声明中的行之前执行if-else声明中的行?

运行总计基于多列pandas的分组和总和

从dict的列中分钟

如何将Docker内部运行的mariadb与主机上Docker外部运行的Python脚本连接起来

Python解析整数格式说明符的规则?

Pandas:将多级列名改为一级

对象的`__call__`方法的setattr在Python中不起作用'

利用Selenium和Beautiful Soup实现Web抓取JavaScript表

使用BeautifulSoup抓取所有链接

numpy.unique如何消除重复列?

如何在两列上groupBy,并使用pyspark计算每个分组列的平均总价值

在matplotlib中使用不同大小的标记顶部添加批注

Gekko中基于时间的间隔约束