Asyncio版本的readline()(1)允许从流中异步读取一行.但是,如果它遇到长度超过限制的行,它将引发异常(2).目前还不清楚在提出这样的例外情况后如何恢复阅读.

我想要一个类似于readline()的函数,它只是丢弃超出限制的部分行,并继续读取,直到流结束.是否存在这样的方法?如果没有,怎么写呢?

1:https://docs.python.org/3/library/asyncio-stream.html#streamreader

2:https://github.com/python/cpython/blob/3.12/Lib/asyncio/streams.py#L549

推荐答案

asyncio.StreamReader.readline的标准库实现几乎就是您想要的.该函数在缓冲区溢出时引发ValueError,但它首先从缓冲区中删除一块数据并将其丢弃.如果你搭上ValueError,然后一直拨打readline,这条小溪还会继续往前走.但你会丢失第一块数据,这不是你想要的.

为了实现你想要的,你可以调用一个稍微低级的函数StreamReader.readuntil(separator). 如果你叫reader.readuntil('\n'),它就等于reader.readline. 在溢出时,此函数引发asyncio.LimitOverrunError,它有一个名为consumed的实例变量. 这是一个整数,表示缓冲区内容的当前长度.

这里有几个奇怪的细节.不管变量的名称是什么,这些字节并没有被"消耗"--它们仍然在缓冲区中.为了保持流的运行,您必须自己使用它们,这可以通过调用函数StreamReader.readexactly来实现.第二件奇怪的事情是,LimitOverrunError.consumed的值可以大于最初传递给open_connection构造函数的关键字参数limit=的值.该参数本应设置输入缓冲区的大小,但它似乎只是设置了提高LimitOverrunError的阈值.因此,缓冲区可能已经包含分隔符,但幸运的是LimitOverrunError.consumed中的字符计数不包括分隔符.因此,调用readexactly不会消耗分隔符.

您可以通过调用readuntil来实现您想要的效果,并添加处理LimitOverrunError的逻辑. 在异常处理程序中,保存第一个数据块并继续从流中读取. 当您最终遇到行尾时,返回您保存的第一个块.

这是一个完整的工作演示程序.为了测试代码,我在asyncio.Open_Connection中设置了Limit=512.服务器传输一个比这个更长的行,然后是一些更短的行.客户端读取每一行,不会引发任何异常.

您要求的功能是readline_no_limit.

import asyncio

_PORT = 54242

MY_LIMIT = 512

async def readline_no_limit(reader):
    """ Return a bytes object.  If there has not been a buffer
    overrun the returned value will end with include the line terminator,
    otherwise not.

    The length of the returned value may be greater than the limit
    specified in the original call to open_connection."""

    discard = False
    first_chunk = b''
    while True:
        try:
            chunk = await reader.readuntil(b'\n') 
            if not discard:
                return chunk
            break
        except asyncio.LimitOverrunError as e:
            print(f"Overrun detected, buffer length now={e.consumed}")
            chunk = await reader.readexactly(e.consumed)
            if not discard:
                first_chunk = chunk
            discard = True
    return first_chunk

async def client():
    await asyncio.sleep(1.0)
    reader, writer = await asyncio.open_connection(host='localhost', 
        port=_PORT, limit=MY_LIMIT)
    writer.write(b"Hello\n")
    while True:
        line = await readline_no_limit(reader)
        print(f"Received {len(line)} bytes, first 40: {line[:40]}")
        if line == b"End\n":
            break
    writer.write(b"Quit\n")

async def got_cnx(reader, writer):
    while True:
        msg = await reader.readline()
        if msg == b"Quit\n":
            break
        if msg != b"Hello\n":
            continue
        long_line = ' '.join([format(x, "x") for x in range(1000)])
        writer.write(bytes(long_line, encoding='utf8'))
        writer.write(b"\n")
        writer.write(b"Short line\n")
        writer.write(b"End\n")

async def main():
    def shutdown(_future):
        print("\nClient quit, server shutdown")
        server.close()
    client_task = asyncio.create_task(client())
    server = await asyncio.start_server(got_cnx, host='localhost', port=_PORT,
        start_serving = False)
    client_task.add_done_callback(shutdown)
    try:
        await server.serve_forever()
    except asyncio.CancelledError:
        print("Server closed normally")

if __name__ == "__main__":
    asyncio.run(main())

输出:

Overrun detected, buffer length now=3727
Received 3727 bytes, first 40: b'0 1 2 3 4 5 6 7 8 9 a b c d e f 10 11 12'
Received 11 bytes, first 40: b'Short line\n'
Received 4 bytes, first 40: b'End\n'

Client quit, server shutdown
Server closed normally

PYTHON 3.11,Ubuntu

Python相关问答推荐

三个给定的坐标可以是矩形的点吗

为什么tkinter框架没有被隐藏?

Python json.转储包含一些UTF-8字符的二元组,要么失败,要么转换它们.我希望编码字符按原样保留

追溯(最近最后一次调用):文件C:\Users\Diplom/PycharmProject\Yolo01\Roboflow-4.py,第4行,在模块导入roboflow中

' osmnx.shortest_track '返回有效源 node 和目标 node 的'无'

如何在类和classy-fastapi -fastapi- followup中使用FastAPI创建路由

SQLAlchemy Like ALL ORM analog

Pandas GroupBy可以分成两个盒子吗?

使用BeautifulSoup抓取所有链接

提高算法效率的策略?

Pandas—堆栈多索引头,但不包括第一列

30个非DATETIME天内的累计金额

如何将一组组合框重置回无 Select tkinter?

为罕见情况下的回退None值键入

PYTHON中的pd.wide_to_long比较慢

替换包含Python DataFrame中的值的<;

如何在Python中创建仅包含完整天数的月份的列表

如何在PYTHON中向单元测试S Side_Effect发送额外参数?

PYODBC错误(SQL包含-26272个参数标记,但提供了235872个参数,HY 000)

`Convert_time_zone`函数用于根据为极点中的每一行指定的时区检索值