一、背景

在初次使用 ChatGPT 时,我就被打字机的视觉效果吸引。总是感觉似曾相识,因为经常在一些科幻电影中看到,高级文明回传的信息在通讯设备的屏幕上以打字机效果逐步出现,在紧张的氛围下,输出人类可读的内容,拉动着观众的神经,一步步将故事情节拉向高潮。

在很早之前我就了解过 Server-Sent Events 这门服务端推送技术,当时看过很多博客介绍其原理和使用场景,最后也没有留下深刻的印象。这一次 ChatGPT 的使用感受带给我一些触动,也激发了对技术的思考,究竟什么样的技术是一门好的技术 ”需要一个杀手级的应用,现实应用会促进技术发展“,技术不是冰冷无情的,贴近生活挖掘其实用价值,一样可以表现出感性的艺术效果。

二、SSE 工作原理

Server-Sent Events(SSE)是一种允许服务器单向推送信息到客户端的技术,与传统的请求/响应模式相比,这种模式更加适合处理实时数据。以下是一些常见的 Server-Sent Events 应用场景:

  • ChatGPT 大型语言模型处理自然语言需要大量的计算资源和时间,响应速度肯定比普通的 HTTP 请求要慢的多。对于这种单项对话场景,ChagtGPT 将先计算出的数据 “推送” 给用户,边计算边返回,提升用户体验。
  • 实时通知:SSE 非常适合于实时通知的场景,例如电子邮件或社交媒体通知。一旦有新消息,服务器可以立即将其推送给客户端,而无需客户端定时轮询检查新消息。
  • 实时数据流:在金融服务、股票市场、体育比赛等场景中,SSE 可以用于实时推送数据流,如股票价格等。

2.1 SSE 工作原理

SSE 的基本工作原理是客户端首先向服务器发送一个 HTTP 请求,然后服务器保持这个连接打开,并周期性地通过这个连接向客户端发送数据。每个数据块都是一个独立的消息,每个消息都以一个空行结束。

使用 SSE 的主要步骤如下:

  1. 客户端创建一个新的EventSource对象,参数是服务器的URL。
let source = new EventSource("http://xxx/chat/completions");
  1. 服务器返回一个 HTTP 响应,Content-Type 为 "text/event-stream",并保持连接打开。
HTTP/1.1 200 OK
Content-Type: text/event-stream
Connection: keep-alive
Cache-Control: no-cache
  1. 服务器通过打开的连接向客户端发送消息。每个消息都包含一些数据,数据可以是任何格式的文本,比如 JSON。消息以两个连续的换行符结束。
data: This is a message\n\n
  1. 客户端监听 "message" 事件,当收到新的消息时,这个事件会被触发。
source.onmessage = function(event) {
  console.log(event.data);
};

注意,由于 SSE 是基于 HTTP 的,因此它受到同源策略的限制。如果你需要进行跨域 SSE,你需要在服务器端设置适当的 CORS 头部信息。另外,SSE 只支持文本数据,不支持二进制数据。如果你需要发送二进制数据,你可能需要考虑使用 WebSockets。

2.2 Fetch API 模拟 SSE

Fetch API 是一种通用的 HTTP 请求和响应模型,它可以用于发送和接收任何类型的 HTTP 请求,支持文本和二进制数据。由于其对流(Stream)的支持,可以模拟 Server-Sent Events (SSE),需要手动处理重连和流式数据。

在某些情况下,你可能会选择使用 Fetch API 模拟 SSE,而不是直接使用 SSE:

  • 发送二进制数据:如果你需要发送或接收二进制数据,你必须使用 Fetch API 或其他技术,因为 SSE 只支持文本数据。
  • 双向通信:如果你需要进行双向通信,你必须使用 Fetch API 或其他技术,因为 SSE 只支持单向通信。
  • 更大的灵活性:Fetch API 提供了更大的灵活性,例如,你可以控制请求头、请求方法、响应处理等。
const url = 'https://your-server.com/events';

fetch(url)
  .then(response => {
    const reader = response.body.getReader();
    const decoder = new TextDecoder();

    // done 为数据是否接收完成 boolean 值
    // value 为接收到的数据, Uint8Array 格式
    return reader.read().then(function processMessage({ done, value }) {
      if (done) {
        return;
      }
      console.log(decoder.decode(value));
      return reader.read().then(processMessage);
    });
  });

在这个示例中,我们使用 fetch() 函数发起 HTTP 请求。然后,使用 response.body.getReader() 获取一个可读流的 reader,用来读取数据。还创建了一个 TextDecoder 对象,用来将二进制数据解码为文本,然后打印出来。然后,再次调用 reader.read() 方法,等待下一批数据。

这样,就可以使用 Fetch API 来接收服务器推送的实时更新,就像使用 SSE 一样,ChatGPT 采用的就是这种实现。

三、SSE 服务端

Server-Sent Events (SSE) 是一种服务器推送技术,允许服务器向客户端发送实时更新。在服务器端,我们需要创建一个 endpoint,发送正确的 HTTP 头部并持续推送数据。

func main() {
	http.HandleFunc("/v1/chat/completions", func(w http.ResponseWriter, r *http.Request) {
		flusher, ok := w.(http.Flusher)
		if !ok {
			http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
			return
		}

		// 事件流媒体 (MIME 类型)
		w.Header().Set("Content-Type", "text/event-stream")
	  // 阻止缓存
		w.Header().Set("Cache-Control", "no-cache")
    // 保持长连接
		w.Header().Set("Connection", "keep-alive")
		// 跨域支持
		w.Header().Set("Access-Control-Allow-Origin", "*")

		phrase := []string{"dolor ", "sit amet", ", consectetur", " adipiscing elit. ", "Ut consequat", " diam at ", "justo efficitur", " mattis."}
		for _, delta := range phrase {
			// 数据内容用 data 表示, 如果数据很长, 可以分成多行用 \n 结尾,
			fmt.Fprintf(w, "data: %s\n", delta)
			flusher.Flush()
			time.Sleep(200 * time.Millisecond)
		}
		// 最后一行使用 \n\n 结尾
		fmt.Fprintf(w, "data: %s\n\n", "[DONE]")
	})

	if err := http.ListenAndServe("127.0.0.1:8080", nil); err != nil {
		panic(err)
	}
}

在 Go 语言中,http.Flusher 是一个接口,它允许 HTTP 响应数据在写入后立即发送到客户端,而不是等待所有响应数据都写入后再一次性发送。这对于长连接和服务器推送的场景非常有用。

// Flush 将用户层的数据写入到 TCP 缓冲区,内核会尽快将 TCP 缓存区数据发送出去
type Flusher interface {
    Flush()
}

扩展:每个 TCP socket 连接在内核中都有一个发送缓存区和接收缓冲区

发送缓冲区用于暂存应用程序写入的数据,直到数据被发送出去并得到对方的确认。接收缓冲区用于暂存收到的数据,直到应用程序读取这些数据。

当应用程序调用发送数据的系统调用(如 write 或 send)时,数据会被复制到发送缓冲区。然后,内核会尽快将这些数据发送出去。但具体发送的时机取决于许多因素,包括但不限于以下几点:

  • Nagle 算法:为了减少小包在网络上的传输,Nagle 算法规定,除非上一个发送的数据包已经得到确认,否则不能发送新的数据包。所以,如果发送缓冲区中的数据量较小,并且上一个数据包还未得到确认,数据可能会在缓冲区中等待。
  • TCP 拥塞控制:TCP 协议通过拥塞控制算法,动态地调整发送速率,以避免网络拥塞。如果网络拥塞,数据可能会在发送缓冲区中等待,直到网络状况改善。
  • 接收方的接收窗口:接收方通过 TCP 的滑动窗口机制,告诉发送方它的接收缓冲区还有多少空间。如果接收方的接收窗口满了,数据必须在发送缓冲区等待,直到接收方的接收窗口有空间。

当数据成功发送并得到确认后,内核会从发送缓冲区中删除这些数据,释放缓冲区空间。

四、实现一个打字机效果

上面我们讨论下 SSE 的工作原理,也知道由于 Web API EventSource 的局限性,ChatGPT 采用了 Fetch API 来手动处理和解析 SSE 服务端端点接收的数据流。那么接下来通过一个简单的打字机案例,加深对所学内容的理解。

这里借鉴了 《ChatGPT 打字机消息回复实现原理》 文章中的前端代码,在其基础上增加了消息处理逻辑,用于适配上面的 SSE 服务端。

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Chat Completion</title>
</head>
<body>
    <button onclick="connectFetch()">建立 fetchSSE 连接</button>
    <button onclick="closeSSE()">断开 fetchSSE 连接</button>
    <br/>
    <br/>
    <div id="text"></div>
    <script>
        const divTyping = document.getElementById('text')
        let ctrl

        const connectFetch = () => {
            ctrl = new AbortController()
            fetchEventSource('http://127.0.0.1:8080/v1/chat/completions', {
                method: 'POST',
                body: JSON.stringify({
                    prompt: 'Lorem ipsum',
                    max_tokens: 20,
                    stream: true,
                }),
                signal: ctrl.signal,
                onopen: () => {
                    console.log('Connection successful.')
                },
                onclose: () => {
                    console.log('Connection closed.')
                },
                onmessage: (delta) => {
                    let prefix = 'data: '
                    if (!delta.startsWith(prefix)) {
                        return
                    }
                    delta = delta.slice(prefix.length)
                    delta = delta.replace(/\n$/, '')
                    if (delta === '[DONE]\n') {
                        return
                    }
                    divTyping.innerText += delta
                }
            })
        }

        const closeSSE = () => {
            if (ctrl) {
                ctrl.abort()
                ctrl = null
            }
        }

        const fetchEventSource = (url, options) => {
            fetch(url, options).then(resp => {
                if (resp.status === 200) {
                    options.onopen && options.onopen()
                    return resp.body
                }
            }).then(rb => {
                const reader = rb.getReader()
                const push = () => {
                    // done 为数据是否接收完成 boolean 值
                    // value 为接收到的数据, Uint8Array 格式
                    return reader.read().then(({done, value}) => {
                        if (done) {
                            options.onclose && options.onclose()
                            return
                        }
                        options.onmessage && options.onmessage(new TextDecoder().decode(value))
                        return push()
                    });
                }
                // 开始读取流信息
                return push()
            }).catch((e) => {
                options.error && options.error(e)
            })
        }
    </script>
</body>
</html>

五、参考资料

作者:|MarsZuo|,原文链接: https://www.cnblogs.com/marszuo/p/17397987.html

文章推荐

服务器遭受攻击之后的常见思路

C# 如何设计一个好用的日志库?【架构篇】

CentOS7---Nginx安装并配置虚拟主机

从源码解析Go exec timeout 实现机制

如何查看当前的 ffmpeg 二进制程序,是否支持 cuda 加速

K8S 性能优化 - OS sysctl 调优

排序算法 Quick Sort

K8S 性能优化 - OS sysctl 调优

lambda 表达式学习笔记

ReactQuery系列文章- 2. 数据转换

【clickhouse专栏】单机版的安装与验证

面试突击51:为什么单例一定要加 volatile?