我遇到了一个Python脚本的问题,它在Windows 11上运行时(通过VS)与在Raspberry Pi4上运行时的性能不同.

该脚本已修改自一个CLi脚本,发现它与Victron Energy硬件交互.

多亏了@ ukbaz,脚本运行并从我货车上的蓝牙连接Victron设备返回数据. playbook 用了一句话:

loop.run_forever()

太棒了!两台机器都能正常工作我不想永远运行,只是5秒的持续时间,足够长的时间读取BT设备和写入txt文件以备将来使用.

在引入一个变化之后:

 the_end = time.time() + 5
    while time.time() < the_end:

        loop.s到p()
        loop.run_forever()

这将运行良好的Windows,读取BT设备,写入文件.在Rpi上,脚本运行时没有错误,但不会读取BT或写入文件.

以下是完整的脚本

from __future__ import annotations

import inspect
import json
import logging
import time
from enum import Enum
from typing import Set
import asyncio
from typing import List, Tuple
from threading import Thread

from bleak import BleakScanner
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData

from victron_ble.devices import Device, DeviceData, detect_device_type
from victron_ble.exceptions import AdvertisementKeyMissingError, UnknownDeviceError

logger = logging.getLogger(__name__)


class BaseScanner:
    def __init__(self) -> None:
        """Initialize the scanner."""
        self._scanner: BleakScanner = BleakScanner(
            detection_callback=self._detection_callback
        )
        self._seen_data: Set[bytes] = set()

    def _detection_callback(self, device: BLEDevice, advertisement: AdvertisementData):
        # Filter for Victron devices and instant readout advertisements
        data = advertisement.manufacturer_data.get(0x02E1)
        if not data or not data.startswith(b"\x10") or data in self._seen_data:
            return

        # De-duplicate advertisements
        if len(self._seen_data) > 1000:
            self._seen_data = set()
        self._seen_data.add(data)

        self.callback(device, data)

    def callback(self, device: BLEDevice, data: bytes):
        raise NotImplementedError()

    async def start(self):
        await self._scanner.start()

    async def s到p(self):
        await self._scanner.s到p()


# An ugly hack 到 print a class as JSON
class DeviceDataEncoder(json.JSONEncoder):
    def default(self, obj):
        if issubclass(obj.__class__, DeviceData):
            data = {}
            for name, method in inspect.getmembers(obj, predicate=inspect.ismethod):
                if name.startswith("get_"):
                    value = method()
                    if isinstance(value, Enum):
                        value = value.name.lower()
                    if value is not None:
                        data[name[4:]] = value
            return data


class Scanner(BaseScanner):
    def __init__(self, device_keys: dict[str, str] = {}):
        super().__init__()
        self._device_keys = {k.lower(): v for k, v in device_keys.items()}
        self._known_devices: dict[str, Device] = {}

    async def start(self):
        logger.info(f"Reading data for {list(self._device_keys.keys())}")
        await super().start()

    def get_device(self, ble_device: BLEDevice, raw_data: bytes) -> Device:
        address = ble_device.address.lower()
        if address not in self._known_devices:
            advertisement_key = self.load_key(address)

            device_klass = detect_device_type(raw_data)
            if not device_klass:
                raise UnknownDeviceError(
                    f"Could not identify device type for {ble_device}"
                )

            self._known_devices[address] = device_klass(advertisement_key)
        return self._known_devices[address]

    def load_key(self, address: str) -> str:
        try:
            return self._device_keys[address]
        except KeyError:
            raise AdvertisementKeyMissingError(f"No key available for {address}")

    def callback(self, ble_device: BLEDevice, raw_data: bytes):
        logger.debug(
            f"Received data from {ble_device.address.lower()}: {raw_data.hex()}"
        )
        try:
            device = self.get_device(ble_device, raw_data)
        except AdvertisementKeyMissingError:
            return
        except UnknownDeviceError as e:
            logger.error(e)
            return
        parsed = device.parse(raw_data)

        blob = {
            "name": ble_device.name,
            "address": ble_device.address,
            "rssi": ble_device.rssi,
            "payload": parsed,
        }
        print(json.dumps(blob, cls=DeviceDataEncoder, indent=1))
        ve_string = json.dumps(blob, cls=DeviceDataEncoder, indent=1)
        print(ve_string)
        #MAC_filename = "this_device" + ".txt"
        #print(f"MAC filename: {MAC_filename}")

        this_file = open("this_device.txt", "w")
        this_file.write(ve_string)
        this_file.close()

        print("file written")
        time.sleep(3)


class DiscoveryScanner(BaseScanner):
    def __init__(self) -> None:
        super().__init__()
        self._seen_devices: Set[str] = set()

    def callback(self, device: BLEDevice, advertisement: bytes):
        if device.address not in self._seen_devices:
            logger.info(f"{device}")
            self._seen_devices.add(device.address)


class DebugScanner(BaseScanner):
    def __init__(self, address: str):
        super().__init__()
        self.address = address

    async def start(self):
        logger.info(f"Dumping advertisements from {self.address}")
        await super().start()

    def callback(self, device: BLEDevice, data: bytes):
        if device.address.lower() == self.address.lower():
            logger.info(f"{time.time():<24}: {data.hex()}")



def my_scan(device_keys: List[Tuple[str, str]]):
    
    loop = asyncio.get_event_loop()

    async def scan(keys):
        scanner = Scanner(keys)
        await scanner.start()

    asyncio.ensure_future(scan({k: v for k, v in device_keys}))

    the_end = time.time() + 5
    while time.time() < the_end:

        loop.s到p()
        loop.run_forever()

if __name__ == '__main__':

    my_scan([("d5:55:aa:4d:99:33","149c3c2865054b71962dcb06866524a9")])


我试着从

run_forever 

run_until_complete

这个方法在Rpi上失败了:(所以返回

为什么脚本在两台机器之间的行为会不同?我如何在RPi上复制?非常感谢你的帮助

推荐答案

我怀疑你只想在文件被写入后退出.

也许你可以使用E—cio事件功能? https://docs.python.org/3/library/asyncio-sync.html#event

导入后,通过在文件顶部放置以下内容,在全局作用域中创建一个事件.

file_written_event = asyncio.Event()

然后在文件写入后添加file_written_event.set().例如

        this_file = open("this_device.txt", "w")
        this_file.write(ve_string)
        this_file.close()

        print("file written")
        file_written_event.set()

你档案的底部会改变最多.它做的基本相同,但有一个while循环等待事件发生,然后再退出.

async def my_scan(device_keys: List[Tuple[str, str]]):

    async def scan(keys):
        scanner = Scanner(keys)
        await scanner.start()

    asyncio.ensure_future(scan({k: v for k, v in device_keys}))
    while not file_written_event.is_set():
        await asyncio.sleep(0.1)


if __name__ == '__main__':
    logging.basicConfig()
    logger.setLevel(logging.DEBUG)
    asyncio.run(my_scan([("d5:55:aa:4d:99:33","149c3c2865054b71962dcb06866524a9")]))

Python相关问答推荐

配置Sweetviz以分析对象类型列,而无需转换

根据条件将新值添加到下面的行或下面新创建的行中

比较2 PD.数组的令人惊讶的结果

使用索引列表列表对列进行切片并获取行方向的向量长度

将数据框架与导入的Excel文件一起使用

如何让程序打印新段落上的每一行?

Godot:需要碰撞的对象的AdditionerBody2D或Area2D以及queue_free?

python中的解释会在后台调用函数吗?

Python Pandas—时间序列—时间戳缺失时间精确在00:00

重置PD帧中的值

巨 Python :逆向猜谜游戏

如何从比较函数生成ngroup?

PySpark:如何最有效地读取不同列位置的多个CSV文件

为什么后跟inplace方法的`.rename(Columns={';b';:';b';},Copy=False)`没有更新原始数据帧?

Python OPCUA,modbus通信代码运行3小时后出现RuntimeError

高效生成累积式三角矩阵

Polars表达式无法访问中间列创建表达式

函数()参数';代码';必须是代码而不是字符串

Pandas ,快速从词典栏中提取信息到新栏

如何批量训练样本大小为奇数的神经网络?