asyncio.StreamReader.readline
的标准库实现几乎就是您想要的.该函数在缓冲区溢出时引发ValueError
,但它首先从缓冲区中删除一块数据并将其丢弃.如果你搭上ValueError
,然后一直拨打readline
,这条小溪还会继续往前走.但你会丢失第一块数据,这不是你想要的.
为了实现你想要的,你可以调用一个稍微低级的函数StreamReader.readuntil(separator)
. 如果你叫reader.readuntil('\n')
,它就等于reader.readline
. 在溢出时,此函数引发asyncio.LimitOverrunError
,它有一个名为consumed
的实例变量. 这是一个整数,表示缓冲区内容的当前长度.
这里有几个奇怪的细节.不管变量的名称是什么,这些字节并没有被"消耗"--它们仍然在缓冲区中.为了保持流的运行,您必须自己使用它们,这可以通过调用函数StreamReader.readexactly
来实现.第二件奇怪的事情是,LimitOverrunError.consumed
的值可以大于最初传递给open_connection
构造函数的关键字参数limit=
的值.该参数本应设置输入缓冲区的大小,但它似乎只是设置了提高LimitOverrunError
的阈值.因此,缓冲区可能已经包含分隔符,但幸运的是LimitOverrunError.consumed
中的字符计数不包括分隔符.因此,调用readexactly
不会消耗分隔符.
您可以通过调用readuntil来实现您想要的效果,并添加处理LimitOverrunError的逻辑. 在异常处理程序中,保存第一个数据块并继续从流中读取. 当您最终遇到行尾时,返回您保存的第一个块.
这是一个完整的工作演示程序.为了测试代码,我在asyncio.Open_Connection中设置了Limit=512.服务器传输一个比这个更长的行,然后是一些更短的行.客户端读取每一行,不会引发任何异常.
您要求的功能是readline_no_limit
.
import asyncio
_PORT = 54242
MY_LIMIT = 512
async def readline_no_limit(reader):
""" Return a bytes object. If there has not been a buffer
overrun the returned value will end with include the line terminator,
otherwise not.
The length of the returned value may be greater than the limit
specified in the original call to open_connection."""
discard = False
first_chunk = b''
while True:
try:
chunk = await reader.readuntil(b'\n')
if not discard:
return chunk
break
except asyncio.LimitOverrunError as e:
print(f"Overrun detected, buffer length now={e.consumed}")
chunk = await reader.readexactly(e.consumed)
if not discard:
first_chunk = chunk
discard = True
return first_chunk
async def client():
await asyncio.sleep(1.0)
reader, writer = await asyncio.open_connection(host='localhost',
port=_PORT, limit=MY_LIMIT)
writer.write(b"Hello\n")
while True:
line = await readline_no_limit(reader)
print(f"Received {len(line)} bytes, first 40: {line[:40]}")
if line == b"End\n":
break
writer.write(b"Quit\n")
async def got_cnx(reader, writer):
while True:
msg = await reader.readline()
if msg == b"Quit\n":
break
if msg != b"Hello\n":
continue
long_line = ' '.join([format(x, "x") for x in range(1000)])
writer.write(bytes(long_line, encoding='utf8'))
writer.write(b"\n")
writer.write(b"Short line\n")
writer.write(b"End\n")
async def main():
def shutdown(_future):
print("\nClient quit, server shutdown")
server.close()
client_task = asyncio.create_task(client())
server = await asyncio.start_server(got_cnx, host='localhost', port=_PORT,
start_serving = False)
client_task.add_done_callback(shutdown)
try:
await server.serve_forever()
except asyncio.CancelledError:
print("Server closed normally")
if __name__ == "__main__":
asyncio.run(main())
输出:
Overrun detected, buffer length now=3727
Received 3727 bytes, first 40: b'0 1 2 3 4 5 6 7 8 9 a b c d e f 10 11 12'
Received 11 bytes, first 40: b'Short line\n'
Received 4 bytes, first 40: b'End\n'
Client quit, server shutdown
Server closed normally
PYTHON 3.11,Ubuntu