我正在单元测试Python代码.它有一个spocio循环.该循环中的两个主要函数具有无尽的while循环.我已经测试了代码的非Inc部分.我想知道单元测试这两个功能的最佳策略是什么:
import asyncio
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(run_async_tasks())
.
.
async def run_async_tasks():
# initialize loop and its variables
loop = asyncio.get_event_loop()
queue = asyncio.Queue(maxsize=100)
# Creating Asyncio tasks
task1 = loop.create_task(receive_data(queue), name="receive_data")
task2 = loop.create_task(analyse_data(queue), name="analyse_data")
await asyncio.gather(task1, task2)
async def receive_data(queue):
# Data packets are yielded from an non-ending stream by sync_receive_data_packets
for data_packet in sync_receive_data_packets():
await queue.put(data_packet)
async def analyse_data(queue):
while not termination_signal_received():
data_packet = await queue.get()
sync_data_analysis(data_packet)
queue.task_done()
我的问题是是否/如何可以对receive_data
和analyse_data
进行单元测试.
为了简单起见,用于读取数据(sync_receive_data_packets
)和执行分析(sync_data_analysis
)的主要逻辑采用非可编程功能.它们已成功测试.我不知道如何测试await queue.put
和await queue.get()
部分,尤其是当它们处于无限循环中时.
事实上,我想知道这些函数是否可以进行单元测试,因为它们不返回任何内容.一个将一个项目放入队列,另一个则读取它.