MQTT Discovery in Python

 Voor een toepassing met een 32x32 led matrix van Adafruit zocht ik een manier om bepaalde instellingen van dit display te kunnen triggeren vanaf Home Assistant.

Mijn opstelling was als volgt:

Adafruit RGB Matrix Bonnet
  • Een Raspberry Pi 3B+ met Adafruit's RGB Matrix bonnet
  • Een 32x32 RGB Matrix LED display
  • Een grote rode knop met LED
In eerste instantie heb ik de LED in de knop aangestuurd met behulp van de 'pigpiod' software (http://abyz.me.uk/rpi/pigpio/pigpiod.html), deze software biedt de mogelijkheid om de gpio pins van een Raspberry pi op afstand aan te sturen, en dit wordt binnen Home Assistant ondersteund (https://www.home-assistant.io/integrations/rpi_gpio/).
Zo kon ik dus zowel de knop als de LED als respectievelijk een binaire sensor en een switch (of light) binnen Home Assistant gebruiken.
Adafruit 32x32 RGB Led Matrix

Daarnaast wilde ik de animatie van het LED display kunnen sturen. Na veel gepuzzel kwam ik op de volgende oplossing:
  1. Een script dat luistert op een MQTT topic en dit schrijft naar een 'named pipe' in linux
  2. Een script dat deze 'named topic' uitleest en actie onderneemt als er iets op gezet wordt
Dat deed een deel van wat ik wilde, maar ik kon nu nog alleen maar een tekst laten zien op het RGB display. Toen bedacht ik me dat ik voor de Arduino controllers de 'MQTT discovery' had gebruikt om een aantal zaken in Home Assistant bekend te maken, dus waarom ook niet in dit geval?

Ik maakte dus een script in Python dat de volgende zaken regelt:
  1. Publiceer een 'light' voor de helderheid van het LED display op MQTT
  2. Publiceer een scène voor elke animatie op MQTT
  3. Luister op de bijbehorende MQTT topics en pas de animatie aan op basis van de invoer
Deel 1:

def send_config_light(client):
    device_info = {
        "name" : ha_name,
        "uniq_id" : ha_uniq_id,
        "stat_t" : ha_status_topic,
        "cmd_t" : ha_command_topic,
        "schema" : "json",
        "brightness" : True
        }
    device_info["dev"] = { "name" : ha_discovery_id, "mdl" : ha_model_id, "mf" : ha_manufacterer_id }
    device_info["dev"]["ids"] = [ ha_client_id ]
    device_info["dev"]["cns"] = [ "ip", local_ip ],[ "mac", local_mac ]
    json_device_info = json.dumps(device_info)
    config_topic = "homeassistant/light/" + ha_uniq_id + "/config"
    client.publish(config_topic, json_device_info)
    
Deel 2:

def send_config_scene(client):
    for i in ha_scenes:
        device_info = {
            "name" : i,
            "uniq_id" : ha_scenes[i],
            "cmd_t" : ha_command_topic_scene,
            "payload_on" : json.dumps({ "activate_scene" : i})
            }
        json_device_info = json.dumps(device_info)
        config_topic = "homeassistant/scene/" + ha_scenes[i] + "/config"
        client.publish(config_topic, json_device_info)
        
Deel 3:
def on_message(client, userdata, message):
    global songinfo, songpic, rt2
    if message.topic == ha_command_topic:
      payload_txt = json.loads(str(message.payload.decode("utf-8")))
      if payload_txt["state"] == "ON":
          print("Switching on display")
          if "brightness" in payload_txt:  
            b_255 = int((payload_txt["brightness"]))
            b = int(mapFromTo(b_255, 0, 255, 0, 100))
            subprocess.call(["/usr/local/bin/ledmenu", "-b", str(b)])
            message = { "state" : "ON", "brightness" : b_255 }
            send_status(client, ha_status_topic, message)
          else:
            subprocess.call(["/usr/local/bin/ledmenu"])
            message = { "state" : "ON" }
            send_status(client, ha_status_topic, message)
      else:
            print("Switching off display")
            subprocess.call(["/usr/local/bin/ledmenu", "-k"])
            message = { "state" : "OFF" }
            send_status(client, ha_status_topic, message)
    elif message.topic == ha_command_topic_scene:
      payload_txt = json.loads(str(message.payload.decode("utf-8")))
      if "activate_scene" in payload_txt:
        print("Activating scene")
        mode = (switch_demo(payload_txt["activate_scene"]))
        subprocess.call(["/usr/local/bin/ledmenu", "-m", str(mode)])
    elif message.topic == ha_topic_song:
      songinfo = message.payload.decode("utf-8")
    elif message.topic == ha_topic_songpic:
      songpic = message.payload.decode("utf-8")
      
Hierbij roept de luisterende 'on_message' routine dus een lokaal script aan '/usr/local/bin/ledmenu', waar de eigenlijke sturing van het ledpaneel mee wordt gedaan. 
 
Het vervolg was natuurlijk om ook de led in de rode knop hiermee aan te sturen, dit gaat op een vergelijkbare manier, eerst een configuratie aanmaken middels MQTT:
 
def send_config_buttonlight(client):
    device_info = {
        "name" : ha_name_buttonlight,
        "uniq_id" : ha_uniq_id_buttonlight,
        "stat_t" : ha_status_topic_buttonlight,
        "cmd_t" : ha_command_topic_buttonlight,
        "schema" : "json",
        "brightness" : False,
        "effect" : True,
        "effect_list" : ["Blink short", "Blink long"],
        "flash_time_short" : blink_short,
        "flash_time_long" : blink_long
        }
    device_info["dev"] = { "name" : ha_discovery_id, "mdl" : ha_model_id, "mf" : ha_manufacterer_id }
    device_info["dev"]["ids"] = [ ha_client_id ]
    device_info["dev"]["cns"] = [ "ip", local_ip ],[ "mac", local_mac ]
    json_device_info = json.dumps(device_info)
    config_topic = "homeassistant/light/" + ha_uniq_id_buttonlight + "/config"
    client.publish(config_topic, json_device_info)

 Ik maak hiermee dus een 'light' aan in Home Assistant, zonder brightness (helaas geen PWM pinnen meer over), maar wel met een paar 'effects': 'Blink short' en 'Blink long'

 Vervolgens de commando's verwerken in de callback routine:

    elif message.topic == ha_command_topic_buttonlight:
      payload_txt = json.loads(str(message.payload.decode("utf-8")))
      if payload_txt["state"] == "ON":
        print("Switching on led button")
        if "effect" in payload_txt:
            try:
              rt2.stop()
            except:
              print("No thread running")
            effect = payload_txt["effect"]
            if effect == 'Blink short':
              rt2 = RepeatedTimer(blink_short, blinkled, client)
              print("Starting thread")
              print(rt2)
            elif effect == 'Blink long':              
              rt2 = RepeatedTimer(blink_long, blinkled, client)
              print("Starting thread")
              print(rt2)
        else:
            switchled(True)
        message = { "state" : "ON" }
        send_status(client, ha_status_topic_buttonlight, message)
      else:
        print("Switching off led button")
        try:
          rt2.stop()
        except:
          print("No thread running")
        switchled(False)
        message = { "state" : "OFF" }
        send_status(client, ha_status_topic_buttonlight, message)

Ik maak hierbij gebruik van een 'Repeating Timer' om de 'blink' functie te realiseren, deze start een aparte thread in python met een timer die elk interval de led omschakelt. Deze timer blijft onafhankelijk van de hoofdroutine lopen zolang hij niet met het rt2.stop commando wordt afgebroken.

Als laatste moet natuurlijk ook de knop zelf via deze routine gedetecteerd worden, hiervoor maken we een 'device automation' aan:

def send_config_trigger(client):
    device_info = {
        "automation_type" : "trigger",
        "topic" : ha_topic_trigger,
        "type" : "button_short_release",
        "subtype" : "button_1",
        "payload" : "Button0"
        }
    device_info["dev"] = { "name" : ha_discovery_id, "mdl" : ha_model_id, "mf" : ha_manufacterer_id }
    device_info["dev"]["ids"] = [ ha_client_id ]
    device_info["dev"]["cns"] = [ "ip", local_ip ],[ "mac", local_mac ]
    json_device_info = json.dumps(device_info)
    config_topic = "homeassistant/device_automation/" + ha_uniq_id_trigger + "/config"
    client.publish(config_topic, json_device_info)

Deze automation verwacht een 'Button0' message op het ha_topic_trigger topic via MQTT,dus daar maken we een listener voor:

def button_callback(gpio, level, tick):
    print("Button pressed")
    client.publish(ha_topic_trigger, "Button0")

Om deze callback te activeren voor de drukknop gebruiken we de callback functie van de pigpio library (http://abyz.me.uk/rpi/pigpio/python.html#callback):

cb1 = gpios.callback(triggerpin, pigpio.RISING_EDGE, button_callback)

En in Home Assistant de juiste 'automation' die luistert op dit event:

- alias: Rode knop huiskamer
  id: rode_knop_huiskamer
  description: ''
  trigger:
  - platform: device
    domain: mqtt
    device_id: cca0621626af8d25ebcf1bb96a46ffaf
    type: button_short_release
    subtype: button_1
    discovery_id: hassbian_button
  condition: []
  action:
  - ...
  mode: restart

En zo kan ik de hele besturing van het display en de knop via een enkel python script regelen.

Reacties

Populaire posts van deze blog

Automatisering in Home Assistant

Automatische gordijnen