如何将 microbit 与 BLE 连接并监听按钮按下事件?

2024-02-02

2021 年 11 月 28 日编辑:

如果您需要使用蓝牙低功耗将 microbit 连接到计算机,并在单击按钮时执行操作。直接跳并跟随@ukBaz https://stackoverflow.com/users/7721752/ukbaz的回答如下。

注意:该解决方案在 GNU/Linux 上完美运行,但在 Windows 上可能不太好。

以下是该帖子的原始问题。我不会编辑它来隐藏我的错误。


摘要:我有一个连接到 rpi-zero 的 microbit。我对 microbit 进行了编码,当A button按下后将通过uart.write到 rpi 为零。

在本次测试中,microbit 将uart.write("Test"),将“测试”字写入 rpi-0。

我的最终目标是使用 rpi-zero 的 BLE 功能充当控制设备,并通过 microbit 按钮发送指令。

我找到了这个关贸总协定服务器代码 https://scribles.net/creating-ble-gatt-server-uart-service-on-raspberry-pi/用 python 为 rpi 编写。它运行起来没有任何问题。

下面的代码将用于监听microbit uart服务并检查接收到的数据是否正确"Test":

import serial

serialPort = serial.Serial(port = "/dev/ttyACM0", baudrate=115200, bytesize=8, timeout=0.5, stopbits=serial.STOPBITS_ONE)

serialString = " "

(serialPort.in_waiting > 0)

while True:

        serialString = serialPort.readline()

        if serialString == b'Test':
            print("Yes")
        else:
            print("F")

但真正的问题是当我尝试将这个循环代码实现到 GATT 服务器代码中时。

我似乎无法弄清楚如何将此值传递给self.send_tx

此外,GATT 服务器代码中似乎已经存在全局循环。所以我尝试使用线程同时运行这两个函数,但是当我添加self.send_tx("Test")它只会抛出一个错误Self is not defined.

抱歉,我对编码完全是菜鸟,有人知道可能的解决办法吗?谢谢

这是完整的代码:

import sys
import threading
import dbus, dbus.mainloop.glib
import serial
from gi.repository import GLib
from example_advertisement import Advertisement
from example_advertisement import register_ad_cb, register_ad_error_cb
from example_gatt_server import Service, Characteristic
from example_gatt_server import register_app_cb, register_app_error_cb

BLUEZ_SERVICE_NAME =           'org.bluez'
DBUS_OM_IFACE =                'org.freedesktop.DBus.ObjectManager'
LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'
GATT_MANAGER_IFACE =           'org.bluez.GattManager1'
GATT_CHRC_IFACE =              'org.bluez.GattCharacteristic1'
UART_SERVICE_UUID =            '6e400001-b5a3-f393-e0a9-e50e24dcca9e'
UART_RX_CHARACTERISTIC_UUID =  '6e400002-b5a3-f393-e0a9-e50e24dcca9e'
UART_TX_CHARACTERISTIC_UUID =  '6e400003-b5a3-f393-e0a9-e50e24dcca9e'
LOCAL_NAME =                   'rpi-gatt-server'
mainloop = None

serialPort = serial.Serial(port = "/dev/ttyACM0", baudrate=115200, bytesize=8, timeout=0.8, stopbits=serial.STOPBITS_ONE)

serialString = " "

(serialPort.in_waiting > 0)

class TxCharacteristic(Characteristic):
    def __init__(self, bus, index, service):
        Characteristic.__init__(self, bus, index, UART_TX_CHARACTERISTIC_UUID,
                                ['notify'], service)
        self.notifying = False
        GLib.io_add_watch(sys.stdin, GLib.IO_IN, self.on_console_input)

    def on_console_input(self, fd, condition):
        s = fd.readline()
        if s.isspace():
            pass
        else:
            self.send_tx(s)
        return True

    def send_tx(self, s):
        if not self.notifying:
            return
        value = []
        for c in s:
            value.append(dbus.Byte(c.encode()))
        self.PropertiesChanged(GATT_CHRC_IFACE, {'Value': value}, [])

    def StartNotify(self):
        if self.notifying:
            print("yes")
            return
        self.notifying = True

    def StopNotify(self):
        if not self.notifying:
            print("no")
            return
        self.notifying = False

class RxCharacteristic(Characteristic):
    def __init__(self, bus, index, service):
        Characteristic.__init__(self, bus, index, UART_RX_CHARACTERISTIC_UUID,
                                ['write'], service)

    def WriteValue(self, value, options):
        print('remote: {}'.format(bytearray(value).decode()))

class UartService(Service):
    def __init__(self, bus, index):
        Service.__init__(self, bus, index, UART_SERVICE_UUID, True)
        self.add_characteristic(TxCharacteristic(bus, 0, self))
        self.add_characteristic(RxCharacteristic(bus, 1, self))

class Application(dbus.service.Object):
    def __init__(self, bus):
        self.path = '/'
        self.services = []
        dbus.service.Object.__init__(self, bus, self.path)

    def get_path(self):
        return dbus.ObjectPath(self.path)

    def add_service(self, service):
        self.services.append(service)

    @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
    def GetManagedObjects(self):
        response = {}
        for service in self.services:
            response[service.get_path()] = service.get_properties()
            chrcs = service.get_characteristics()
            for chrc in chrcs:
                response[chrc.get_path()] = chrc.get_properties()
        return response

class UartApplication(Application):
    def __init__(self, bus):
        Application.__init__(self, bus)
        self.add_service(UartService(bus, 0))

class UartAdvertisement(Advertisement):
    def __init__(self, bus, index):
        Advertisement.__init__(self, bus, index, 'peripheral')
        self.add_service_uuid(UART_SERVICE_UUID)
        self.add_local_name(LOCAL_NAME)
        self.include_tx_power = True

def find_adapter(bus):
    remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'),
                               DBUS_OM_IFACE)
    objects = remote_om.GetManagedObjects()
    for o, props in objects.items():
        if LE_ADVERTISING_MANAGER_IFACE in props and GATT_MANAGER_IFACE in props:
            return o
        print('Skip adapter:', o)
    return None

def check():
    while True:
        serialString = serialPort.readline()
        if serialString == b'Test':
            print("Okay, Test")
            self.send_tx("Test")
        else:
            print("No")

def main():
    global mainloop
    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
    bus = dbus.SystemBus()
    adapter = find_adapter(bus)
    if not adapter:
        print('BLE adapter not found')
        return

    service_manager = dbus.Interface(
                                bus.get_object(BLUEZ_SERVICE_NAME, adapter),
                                GATT_MANAGER_IFACE)
    ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
                                LE_ADVERTISING_MANAGER_IFACE)

    app = UartApplication(bus)
    adv = UartAdvertisement(bus, 0)

    mainloop = GLib.MainLoop()

    service_manager.RegisterApplication(app.get_path(), {},
                                        reply_handler=register_app_cb,
                                        error_handler=register_app_error_cb)
    ad_manager.RegisterAdvertisement(adv.get_path(), {},
                                     reply_handler=register_ad_cb,
                                     error_handler=register_ad_error_cb)

    try:
        mainloop.run()
    except KeyboardInterrupt:
        adv.Release()

if __name__ == '__main__':
    p1 = threading.Thread(target=main)
    p2 = threading.Thread(target=check)
    p1.start()
    p2.start()

我想这可能是一个XY问题 https://en.wikipedia.org/wiki/XY_problem。据我了解,您希望通过低功耗蓝牙 (BLE) 将按钮按下操作从 micro:bit 发送到 RPi。如果是这样的话,那么有一种更有效的方法可以做到这一点。

In the 蓝牙配置文件 https://lancaster-university.github.io/microbit-docs/resources/bluetooth/bluetooth_profile.html对于 micro:bit,有一个按钮服务和一个可以使用的按钮 A 状态特征。他们的 UUID 是:

BTN_SRV = 'E95D9882-251D-470A-A062-FA1922DFA9A8'
BTN_A_STATE = 'E95DDA90-251D-470A-A062-FA1922DFA9A8'

You need to set up the GATT server on the micro:bit. The most efficient way to do this is using https://makecode.microbit.org/#editor https://makecode.microbit.org/#editor to create the following: enter image description here

如果左侧菜单上没有蓝牙,请单击屏幕右上角的齿轮,选择Extensions,然后选择Bluetooth来替换Radio扩大。

RPi 上的 GATT 客户端代码可以通过使用来简化pydbus https://pypi.org/project/pydbus/D-Bus 绑定库。

有了蓝牙,而不是有一个while循环并不断轮询 micro:bit,使用事件循环订阅来自(在本例中)按钮 A 特性的通知会更有效。我命名的函数btn_handler每次按下或释放 micro:bit 按钮 A 时调用。

下面的代码不会在 micro:bit 和 RPi 之间进行初始配对。由于配对是一次性配置步骤,因此我手动执行此操作。

下面是 RPi 的示例 Python 代码,它响应 micro:bit 上按下的按钮 A...

from time import sleep
import pydbus
from gi.repository import GLib

DEVICE_ADDR = 'DE:82:35:E7:CE:BE' #  micro:bit address
BTN_A_STATE = 'E95DDA90-251D-470A-A062-FA1922DFA9A8'

# DBus object paths
BLUEZ_SERVICE = 'org.bluez'
ADAPTER_PATH = '/org/bluez/hci0'
device_path = f"{ADAPTER_PATH}/dev_{DEVICE_ADDR.replace(':', '_')}"

# setup dbus
bus = pydbus.SystemBus()
mngr = bus.get(BLUEZ_SERVICE, '/')
adapter = bus.get(BLUEZ_SERVICE, ADAPTER_PATH) 
device = bus.get(BLUEZ_SERVICE, device_path)

device.Connect()

while not device.ServicesResolved:
    sleep(0.5)

def get_characteristic_path(dev_path, uuid):
    """Look up DBus path for characteristic UUID"""
    mng_objs = mngr.GetManagedObjects()
    for path in mng_objs:
        chr_uuid = mng_objs[path].get('org.bluez.GattCharacteristic1', {}).get('UUID')
        if path.startswith(dev_path) and chr_uuid == uuid.casefold():
           return path

# Characteristic DBus information
btn_a_path = get_characteristic_path(device._path, BTN_A_STATE)
btn_a = bus.get(BLUEZ_SERVICE, btn_a_path)
# Read button A without event loop notifications
print(btn_a.ReadValue({}))

# Enable eventloop for notifications
def btn_handler(iface, prop_changed, prop_removed):
    """Notify event handler for button press"""
    if 'Value' in prop_changed:
        new_value = prop_changed['Value']
        print(f"Button A state: {new_value}")
        print(f'As byte: {bytes(new_value)}')
        print(f'As bytearray: {bytearray(new_value)}')
        print(f'As int: {int(new_value[0])}')
        print(f'As bool: {bool(new_value[0])}')

mainloop = GLib.MainLoop()
btn_a.onPropertiesChanged = btn_handler
btn_a.StartNotify()
try:
    mainloop.run()
except KeyboardInterrupt:
    mainloop.quit()
    btn_a.StopNotify()
    device.Disconnect()
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何将 microbit 与 BLE 连接并监听按钮按下事件? 的相关文章

随机推荐