下面是我代码的简化版本:

var page;
var launched = false;

app.post("/test", async(req, res) => {

    if ( launched == false ) {
        const browser = await puppeteer.launch({
            headless: true, /* I've tried with "new" and false too */
        });

        page = await browser.newPage();

        var desiredUrl = "url here";
        await page.goto(desiredUrl);

        /* Stream data from the page */
        await page.exposeFunction('writeData', (data) => {
            console.log("Writing data");
            res.write(data);
        });

        /* End stream */
        await page.exposeFunction('endStream', () => {
                console.log("End stream");
                res.end();
        });

        launched = true;
    }

    await page.evaluate(async ()=>{
        var output = await fetch("/endpoint_here", {
    "headers": {
            /* headers here */
               },
        });

        var reader = output.body.getReader();

        while (true) {
            var { done, value } = await reader.read();
            if (done) {
                window.endStream();
                return;
            }
            
            var decoder = new TextDecoder();
            var decodedData = decoder.decode(value);
            window.writeData(decodedData);
        }
    });

})

然而,这并不起作用.我try 的方法如下:

res在page. evaluate()中不起作用.我试着把res发送到page. evaluate()中,但是它会 destruct 代码.

我试过使用page. exposeFunction()并在那里执行res. write(和res. end()),它工作了,但只是第一次.第二次(以及之后的每一次)我发送post请求时,代码运行正常(它在那些函数中执行console. logs),只是它根本没有执行res. write()和res. end().

我甚至试着让它更新page. evaluate()中的一个全局变量,使用一个代理检测变量的变化,并执行res. write()来写入数据,但在第一次发布请求之后,这也中断了.

唯一解决这个奇怪的问题的唯一方法是重新启动程序,这显然不是一个解决方案.

我还try 将流数据记录到页面中的控制台,并使用page. on('console')将数据res. write()返回到客户端.一次只需要一个请求就能完美地工作.但是,当端点"/test"同时有多个请求时,它会将响应写入两个客户端,而不是只写入发起请求的客户端.

唯一的一件事,就是返回响应后,提取结束,没有流它.但是,我希望它是流.

我被困住了,不知道该怎么办,所以任何帮助都将不胜感激.

推荐答案

我无法重现这个问题.这个问题似乎与您正在击中的端点和/或您的服务器配置有关.我建议分享这些信息或try 建立一个你自己的再现.

这是我的复制try ,以防它对你有帮助.你可以看到代码工作,如果你运行

$ node -v
v20.11.1
$ npm i
$ node sse-endpoint &
$ node server &
$ curl localhost:3001/stream
data: {"chunk":0}

data: {"chunk":1}

data: {"chunk":2}

data: {"chunk":3}

# ... and so on, streamed every second ...

package.json:

{
  "dependencies": {
    "express": "^4.19.2",
    "puppeteer": "^22.6.0"
  }
}

sse—endpoint.js(这是你拦截的远程API的模拟):

const express = require("express");
const app = express();

app.use((req, res, next) => {
  res.setHeader("Access-Control-Allow-Origin", "*");
  next();
});

app.get("/stream", (req, res) => {
  res.writeHead(200, {
    "Connection": "keep-alive",
    "Cache-Control": "no-cache",
    "Content-Type": "text/event-stream",
  });

  let counter = 0;
  const interval = setInterval(() => {
    const chunk = JSON.stringify({chunk: counter++});
    res.write(`data: ${chunk}\n\n`);
  }, 1000);

  res.on("close", () => {
    clearInterval(interval);
    res.end();
  });
});

const listener = app.listen(process.env.PORT || 3000, () =>
  console.log(`SSE endpoint is listening on port ${listener.address().port}`)
);

server.js(这是你的API):

const express = require("express");
const puppeteer = require("puppeteer");

const app = express();
const browserReady = puppeteer.launch();

app.get("/stream", async (req, res) => {
  res.writeHead(200, {
    "Connection": "keep-alive",
    "Cache-Control": "no-cache",
    "Content-Type": "text/event-stream",
  });

  let page;
  try {
    page = await (await browserReady).newPage();
    await page.exposeFunction("writeData", data => {
      res.write(data);
    });
    await page.exposeFunction("endStream", () => {
      res.end();
    });
    await page.evaluate(async () => {
      const output = await fetch(
        "http://localhost:3000/stream"
      );
      const reader = output.body.getReader();

      while (true) {
        const {done, value} = await reader.read();

        if (done) {
          return window.endStream();
        }

        const decoder = new TextDecoder();
        const decodedData = decoder.decode(value);
        window.writeData(decodedData);
      }
    });
  } catch (err) {
    console.error(err);
    res.end();
  } finally {
    await page?.close();
  }
});

const listener = app.listen(process.env.PORT || 3001, () =>
  console.log(
    `Proxy server is listening on port ${listener.address().port}`
  )
);

注意:此代码是作为POC进行演示的,并不一定演示最佳实践.

如果SSE端点是由另一个页面流传输的,这应该不会影响这个再现.你可以让sse-endpoint服务一个HTML文件,然后在运行evaluate()/fetch()之前运行page.goto("localhost:3000");这应该不会有什么不同.确保正确地截取或使用实际端点.很可能,问题就在那里,细节在那个阶段很重要.

取决于你想要实现的目标(我猜是像调试一个GPT聊天源?),无论基本目标是什么,都可能有much simpler way to achieve分—这是为什么全面背景很重要的另一个原因.

Node.js相关问答推荐

nest js控制器方法调用两次

无法从ejs Web应用程序中的正文中提取数据

Sveltekit停靠的应用程序找不到从Build导入的包

Mongoose:如何在文档中推送到Caped(有限大小,滚动窗口)数组?

npm错误;无法解析依赖项:npm ERR!对等webpack@;5.x.x;来自@webpack-cli/serve@2.0.5";

验证器功能在mongoose 中不起作用

Rails 7导入npm yaml包时出现404错误

SvelteKit应用程序立即退出,没有错误

Postgressql的BIGSERIAL自增序列,即使由于唯一约束错误没有创建行,也会自动增加

mongoose findOneAndUpdate 不更新数据库

将代码转换为 ES6 Discord.js 的问题

每秒从套接字传来的数据有哪些存储方式?

在 nodejs 中使用 multer 上传文件返回未定义的 req.file 和空的 req.body

如何使用 superagent/supertest 链接 http 调用?

如果我使用像 express 这样的 node 服务器,是否需要 webpack-dev-server

如何使用 mocha.js 模拟用于单元测试的依赖类?

为什么 Node 控制台不显示功能代码?

要求('babel/register')不起作用

使用 Monit 而不是基本的 Upstart 设置有什么好处?

在 Node.js 中获取终端的宽度