我正在单元测试Python代码.它有一个spocio循环.该循环中的两个主要函数具有无尽的while循环.我已经测试了代码的非Inc部分.我想知道单元测试这两个功能的最佳策略是什么:

import asyncio

def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run_async_tasks())
    .
    .
    

async def run_async_tasks():
    # initialize loop and its variables
    loop = asyncio.get_event_loop()
    queue = asyncio.Queue(maxsize=100)

    # Creating Asyncio tasks
    task1 = loop.create_task(receive_data(queue), name="receive_data")
    task2 = loop.create_task(analyse_data(queue), name="analyse_data")

    await asyncio.gather(task1, task2)

async def receive_data(queue):
    # Data packets are yielded from an non-ending stream by sync_receive_data_packets
    for data_packet in sync_receive_data_packets():
        await queue.put(data_packet)

async def analyse_data(queue):
    while not termination_signal_received():
        data_packet = await queue.get()
        sync_data_analysis(data_packet)
        queue.task_done()

我的问题是是否/如何可以对receive_dataanalyse_data进行单元测试.

为了简单起见,用于读取数据(sync_receive_data_packets)和执行分析(sync_data_analysis)的主要逻辑采用非可编程功能.它们已成功测试.我不知道如何测试await queue.putawait queue.get()部分,尤其是当它们处于无限循环中时. 事实上,我想知道这些函数是否可以进行单元测试,因为它们不返回任何内容.一个将一个项目放入队列,另一个则读取它.

推荐答案

这两个功能都不是"无尽的"--第一个功能将在sync_receive_data_packets返回的生成器耗尽时结束,另一个功能将在termination_signal_received()返回True时结束.只需使用mock.patch将这两个功能指向您的测试控制下的调用即可.您的测试应该进一步验证第一个函数是否填充了queue,以及是否使用了它并使用正确的值调用了sync_data_analisys(也应该修补它)

否则(如果用while True重复它们,没有停止条件),单元测试这些的方法将是编写一个专门的队列类,该类将用作队列,但当调用.get.put时,这可能会在测试代码的控制下引发异常.

Python相关问答推荐

有条件地采样我的大型DF的最有效方法

列表上值总和最多为K(以O(log n))的最大元素数

Python 3.12中的通用[T]类方法隐式类型检索

运行总计基于多列pandas的分组和总和

如何从具有不同len的列表字典中创建摘要表?

发生异常:TclMessage命令名称无效.!listbox"

在Wayland上使用setCellWidget时,try 编辑QTable Widget中的单元格时,PyQt 6崩溃

如何在python polars中停止otherate(),当使用when()表达式时?

Pandas—合并数据帧,在公共列上保留非空值,在另一列上保留平均值

如何将一个动态分配的C数组转换为Numpy数组,并在C扩展模块中返回给Python

不能使用Gekko方程'

启用/禁用shiny 的自动重新加载

如何在Python中使用Pandas将R s Tukey s HSD表转换为相关矩阵''

ConversationalRetrivalChain引发键错误

在matplotlib中使用不同大小的标记顶部添加批注

Python 3试图访问在线程调用中实例化的类的对象

使用嵌套对象字段的Qdrant过滤

从嵌套极轴列的列表中删除元素

计算机找不到已安装的库'

如何获得满足掩码条件的第一行的索引?