我遇到了一个Python脚本的问题,它在Windows 11上运行时(通过VS)与在Raspberry Pi4上运行时的性能不同.
该脚本已修改自一个CLi脚本,发现它与Victron Energy硬件交互.
多亏了@ ukbaz,脚本运行并从我货车上的蓝牙连接Victron设备返回数据. playbook 用了一句话:
loop.run_forever()
太棒了!两台机器都能正常工作我不想永远运行,只是5秒的持续时间,足够长的时间读取BT设备和写入txt文件以备将来使用.
在引入一个变化之后:
the_end = time.time() + 5
while time.time() < the_end:
loop.s到p()
loop.run_forever()
这将运行良好的Windows,读取BT设备,写入文件.在Rpi上,脚本运行时没有错误,但不会读取BT或写入文件.
以下是完整的脚本
from __future__ import annotations
import inspect
import json
import logging
import time
from enum import Enum
from typing import Set
import asyncio
from typing import List, Tuple
from threading import Thread
from bleak import BleakScanner
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData
from victron_ble.devices import Device, DeviceData, detect_device_type
from victron_ble.exceptions import AdvertisementKeyMissingError, UnknownDeviceError
logger = logging.getLogger(__name__)
class BaseScanner:
def __init__(self) -> None:
"""Initialize the scanner."""
self._scanner: BleakScanner = BleakScanner(
detection_callback=self._detection_callback
)
self._seen_data: Set[bytes] = set()
def _detection_callback(self, device: BLEDevice, advertisement: AdvertisementData):
# Filter for Victron devices and instant readout advertisements
data = advertisement.manufacturer_data.get(0x02E1)
if not data or not data.startswith(b"\x10") or data in self._seen_data:
return
# De-duplicate advertisements
if len(self._seen_data) > 1000:
self._seen_data = set()
self._seen_data.add(data)
self.callback(device, data)
def callback(self, device: BLEDevice, data: bytes):
raise NotImplementedError()
async def start(self):
await self._scanner.start()
async def s到p(self):
await self._scanner.s到p()
# An ugly hack 到 print a class as JSON
class DeviceDataEncoder(json.JSONEncoder):
def default(self, obj):
if issubclass(obj.__class__, DeviceData):
data = {}
for name, method in inspect.getmembers(obj, predicate=inspect.ismethod):
if name.startswith("get_"):
value = method()
if isinstance(value, Enum):
value = value.name.lower()
if value is not None:
data[name[4:]] = value
return data
class Scanner(BaseScanner):
def __init__(self, device_keys: dict[str, str] = {}):
super().__init__()
self._device_keys = {k.lower(): v for k, v in device_keys.items()}
self._known_devices: dict[str, Device] = {}
async def start(self):
logger.info(f"Reading data for {list(self._device_keys.keys())}")
await super().start()
def get_device(self, ble_device: BLEDevice, raw_data: bytes) -> Device:
address = ble_device.address.lower()
if address not in self._known_devices:
advertisement_key = self.load_key(address)
device_klass = detect_device_type(raw_data)
if not device_klass:
raise UnknownDeviceError(
f"Could not identify device type for {ble_device}"
)
self._known_devices[address] = device_klass(advertisement_key)
return self._known_devices[address]
def load_key(self, address: str) -> str:
try:
return self._device_keys[address]
except KeyError:
raise AdvertisementKeyMissingError(f"No key available for {address}")
def callback(self, ble_device: BLEDevice, raw_data: bytes):
logger.debug(
f"Received data from {ble_device.address.lower()}: {raw_data.hex()}"
)
try:
device = self.get_device(ble_device, raw_data)
except AdvertisementKeyMissingError:
return
except UnknownDeviceError as e:
logger.error(e)
return
parsed = device.parse(raw_data)
blob = {
"name": ble_device.name,
"address": ble_device.address,
"rssi": ble_device.rssi,
"payload": parsed,
}
print(json.dumps(blob, cls=DeviceDataEncoder, indent=1))
ve_string = json.dumps(blob, cls=DeviceDataEncoder, indent=1)
print(ve_string)
#MAC_filename = "this_device" + ".txt"
#print(f"MAC filename: {MAC_filename}")
this_file = open("this_device.txt", "w")
this_file.write(ve_string)
this_file.close()
print("file written")
time.sleep(3)
class DiscoveryScanner(BaseScanner):
def __init__(self) -> None:
super().__init__()
self._seen_devices: Set[str] = set()
def callback(self, device: BLEDevice, advertisement: bytes):
if device.address not in self._seen_devices:
logger.info(f"{device}")
self._seen_devices.add(device.address)
class DebugScanner(BaseScanner):
def __init__(self, address: str):
super().__init__()
self.address = address
async def start(self):
logger.info(f"Dumping advertisements from {self.address}")
await super().start()
def callback(self, device: BLEDevice, data: bytes):
if device.address.lower() == self.address.lower():
logger.info(f"{time.time():<24}: {data.hex()}")
def my_scan(device_keys: List[Tuple[str, str]]):
loop = asyncio.get_event_loop()
async def scan(keys):
scanner = Scanner(keys)
await scanner.start()
asyncio.ensure_future(scan({k: v for k, v in device_keys}))
the_end = time.time() + 5
while time.time() < the_end:
loop.s到p()
loop.run_forever()
if __name__ == '__main__':
my_scan([("d5:55:aa:4d:99:33","149c3c2865054b71962dcb06866524a9")])
我试着从
run_forever
到
run_until_complete
这个方法在Rpi上失败了:(所以返回
为什么脚本在两台机器之间的行为会不同?我如何在RPi上复制?非常感谢你的帮助