我有一个循环,它可以获取传感器信息,使数据可用于实时仪表板,还可以写入CSV文件.我想继续提供实时数据,没有任何延迟,但只想每5分钟写入一次CSV文件.因为它会暂停所有事情,所以似乎对此不起作用.对如何处理这一问题有什么建议吗?

下面是相关的循环:

while True:
        with open('enviroplus.csv', 'a', newline='') as csvfile:
            writer = csv.DictWriter(csvfile, delimiter=',', quotechar='|', 
quoting=csv.QUOTE_MINIMAL, fieldnames=['Date/Time','temperature','humidity','pressure','oxidising','reducing','nh3','lux','proximity','pm1','pm25',   'pm10'])                                                                                                                                                                            
            dt = datetime.now()
            if not file_exists:
                writer.writeheader()
                file_exists = True
            writer.writerow({'Date/Time': dt.strftime('%Y-%m-%d %H:%M:%S'), 
'temperature': ...})

    get_temperature(args.factor)
    get_pressure()
    get_humidity()
    get_light()
    if not args.enviro:
        get_gas()
        get_particulates()
    if DEBUG:
        logging.info('Sensor data: {}'.format(collect_all_data()))

推荐答案

将上次更新的时间存储在变量中,如果已经过了五分钟,则更新它:

last_updated = datetime.now() - timedelta(minutes=6)

while True:
    if ((datetime.now() - last_updated).seconds / 60) >= 5:
        with open('enviroplus.csv', 'a', newline='') as csvfile:
            writer = csv.DictWriter(csvfile, delimiter=',', quotechar='|', 
quoting=csv.QUOTE_MINIMAL, fieldnames=['Date/Time','temperature','humidity','pressure','oxidising','reducing','nh3','lux','proximity','pm1','pm25',   'pm10'])                                                                                                                                                                            
            dt = datetime.now()
            if not file_exists:
                writer.writeheader()
                file_exists = True
            writer.writerow({'Date/Time': dt.strftime('%Y-%m-%d %H:%M:%S'), 
'temperature': ...})
        last_updated = dt
    get_temperature(args.factor)
    get_pressure()
    get_humidity()
    get_light()
    if not args.enviro:
        get_gas()
        get_particulates()
    if DEBUG:
        logging.info('Sensor data: {}'.format(collect_all_data()))

确保您也从datetime导入timedelta.请注意,last_updated最初设置为6分钟前,以确保文件在循环的第一次迭代时更新.

Python相关问答推荐

如何将Pydantic URL验证限制为特定主机或网站

重命名变量并使用载体中的字符串存储 Select 该变量

如何修复fpdf中的线路出血

Python中的锁定类和线程以实现dict移动

Python中两个矩阵的自定义Hadamard风格产物

如何将新的SQL服务器功能映射到SQL Alchemy的ORM

将numpy矩阵映射到字符串矩阵

具有多个选项的计数_匹配

Select 用a和i标签包裹的复选框?

有症状地 destruct 了Python中的regex?

_repr_html_实现自定义__getattr_时未显示

Excel图表-使用openpyxl更改水平轴与Y轴相交的位置(Python)

为什么这个带有List输入的简单numba函数这么慢

如何在虚拟Python环境中运行Python程序?

如果条件不满足,我如何获得掩码的第一个索引并获得None?

如何将多进程池声明为变量并将其导入到另一个Python文件

在vscode上使用Python虚拟环境时((env))

无法在Docker内部运行Python的Matlab SDK模块,但本地没有问题

使用BeautifulSoup抓取所有链接

python中csv. Dictreader. fieldname的类型是什么?'