#!/usr/bin/env python3 """ This example script configures an RGB downlight that has just been flashed with the OpenBK7231N_App firmware. It was used on a Ubuntu 20.04 machine, but should be customized for your own use case. Search this file for "here>" to find the places that need to be customized. See scripts/README.md for example usage You will need to install requests and paho-mqtt: pip install requests paho-mqtt """ import subprocess import os import time import requests import pathlib import paho.mqtt.client as mqtt AP_PREFIX = "OpenBK7231N_" MQTT_HOST = os.environ["MQTT_HOST"] SWITCH_TOPIC = os.environ["SWITCH_TOPIC"] mqtt_client = mqtt.Client() COLOR_MAP = { "1": "#2200000000", "2": "#0022000000", "3": "#0000220000", "4": "#2200220000", } def get_wifi_adapter(): # Get the wifi adapter name # Return the adapter name p = subprocess.run( ["nmcli", "dev", "status"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) for l in p.stdout.splitlines(): if " wifi " in l: return l.split()[0] raise ValueError("Failed to find wifi adapter") def list_aps(): # List the available APs # Return a list of APs subprocess.run(["nmcli", "radio", "wifi", "off"]) time.sleep(1) subprocess.run(["nmcli", "radio", "wifi", "on"]) time.sleep(1) p = subprocess.run( [ "nmcli", "-t", "-f", "SSID,SECURITY", "dev", "wifi", "list", "--rescan", "yes", ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) open_ssids = [] for l in p.stdout.splitlines(): ssid, security = l.split(":") if security == "": open_ssids.append(ssid) return open_ssids def find_ap(): # Ensure the interface is in managed mode subprocess.run(["nmcli", "device", "set", get_wifi_adapter(), "managed", "yes"]) while 1: print("Scanning for APs...") for ap in list_aps(): if ap.startswith(AP_PREFIX): print(f"Found AP: {ap}") return ap time.sleep(3) def connect_ap(ap): # Connect to the AP # Return True if successful, False otherwise p = subprocess.run( ["nmcli", "dev", "wifi", "connect", ap, "name", ap], ) if p.returncode != 0: raise ValueError("Failed to connect to AP") print("Connected to AP") ID_PATH = pathlib.Path("light-id") def get_id(): if not ID_PATH.exists(): ID_PATH.write_text("1") return int(ID_PATH.read_text()) + 1 def write_id(id): ID_PATH.write_text(str(id)) def make_req(url): print("Making request: ", url) RETRIES = 5 for i in range(RETRIES): try: r = requests.get(url, timeout=5) if not r.ok: raise ValueError(f"Failed to make request: {url}") print("Done") time.sleep(1) return except Exception as e: print(i + 1, "Error making request:", e) time.sleep(3) raise ValueError(f"Failed to make request: {url} after {RETRIES} retries") STATE = {"online": False} def on_message(client, userdata, message): print("Message received (IP Address): ", str(message.payload.decode("utf-8"))) parts = message.topic.split("/") msg = message.payload.decode("utf-8") if parts[-1] == "ip": print("Online") STATE["online"] = True def send_command(command, value): mqtt_client.publish(f"cmnd/{device_name}/{command}", value) time.sleep(1) def main(): global device_name print(f"MQTT_HOST: {MQTT_HOST}, SWITCH_TOPIC: {SWITCH_TOPIC}") dev_id = get_id() device_name = f"downlight{dev_id}" print("Device name:", device_name) # Find the AP to connect to ap = find_ap() # Connect to the AP connect_ap(ap) # Configure the pins make_req( "http://192.168.4.1/cfg_pins?0=0&r0=0&1=0&r1=0&2=0&r2=0&3=0&r3=0&4=0&r4=0&5=0&r5=0&6=0&r6=0&7=23&r7=0&8=24&r8=0&9=0&r9=0&10=0&r10=0&11=0&r11=0&12=0&r12=0&13=0&r13=0&14=0&r14=0&15=0&r15=0&16=0&r16=0&17=0&r17=0&18=0&r18=0&19=0&r19=0&20=0&r20=0&21=0&r21=0&22=0&r22=0&23=0&r23=0&24=0&r24=0&25=0&r25=0&26=0&r26=0&27=0&r27=0&28=0&0=r28" ) # Configure MQTT make_req( f"http://192.168.4.1/cfg_mqtt_set?host=&port=1883&client={device_name}&group=dining&user=&password=" ) # Set default startup color make_req( "http://192.168.4.1/startup_command?data=backlog+led_basecolor_rgbcw+%230000007777%3B+led_enableAll+1%3B&startup_cmd=1" ) make_req( "http://192.168.4.1/cfg_wifi_set?ssid=&pass=" ) mqtt_client.connect(MQTT_HOST) mqtt_client.on_message = on_message mqtt_client.loop_start() mqtt_client.subscribe(f"{device_name}/ip") # Turn off the switch print("Rebooting device...") mqtt_client.publish(SWITCH_TOPIC, "off") time.sleep(2) mqtt_client.publish(SWITCH_TOPIC, "on") # Wait for the device to connect print("Waiting for device to connect via MQTT...", end="", flush=True) while not STATE["online"]: time.sleep(1) print(".", end="", flush=True) print("Connected") output_color = COLOR_MAP.get(SWITCH_TOPIC[-1], "#2200000000") # Set the device name send_command("ShortName", device_name) send_command("FriendlyName", device_name) send_command("led_basecolor_rgbcw", output_color) send_command("power", "1") write_id(dev_id) print(f"Flashed device {device_name} successfully") if __name__ == "__main__": main()