Files
tuya-cloudcutter/scripts/configure-downlight.py-example
sillyfrog ca40fd0d2f Optional Helper scripts (#356)
* Helper script support

---------

Co-authored-by: Sillyfrog <sillyfrog@users.noreply.github.com>
2023-04-15 08:21:13 -05:00

220 lines
5.6 KiB
Python

#!/usr/bin/env python3
"""
This example script configures an RGB downlight that has just been flashed with the
OpenBK7231N_App firmware.
It was used on a Ubuntu 20.04 machine, but should be customized for your own use case.
Search this file for "here>" to find the places that need to be customized.
See scripts/README.md for example usage
You will need to install requests and paho-mqtt:
pip install requests paho-mqtt
"""
import subprocess
import os
import time
import requests
import pathlib
import paho.mqtt.client as mqtt
AP_PREFIX = "OpenBK7231N_"
MQTT_HOST = os.environ["MQTT_HOST"]
SWITCH_TOPIC = os.environ["SWITCH_TOPIC"]
mqtt_client = mqtt.Client()
COLOR_MAP = {
"1": "#2200000000",
"2": "#0022000000",
"3": "#0000220000",
"4": "#2200220000",
}
def get_wifi_adapter():
# Get the wifi adapter name
# Return the adapter name
p = subprocess.run(
["nmcli", "dev", "status"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
for l in p.stdout.splitlines():
if " wifi " in l:
return l.split()[0]
raise ValueError("Failed to find wifi adapter")
def list_aps():
# List the available APs
# Return a list of APs
subprocess.run(["nmcli", "radio", "wifi", "off"])
time.sleep(1)
subprocess.run(["nmcli", "radio", "wifi", "on"])
time.sleep(1)
p = subprocess.run(
[
"nmcli",
"-t",
"-f",
"SSID,SECURITY",
"dev",
"wifi",
"list",
"--rescan",
"yes",
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
open_ssids = []
for l in p.stdout.splitlines():
ssid, security = l.split(":")
if security == "":
open_ssids.append(ssid)
return open_ssids
def find_ap():
# Ensure the interface is in managed mode
subprocess.run(["nmcli", "device", "set", get_wifi_adapter(), "managed", "yes"])
while 1:
print("Scanning for APs...")
for ap in list_aps():
if ap.startswith(AP_PREFIX):
print(f"Found AP: {ap}")
return ap
time.sleep(3)
def connect_ap(ap):
# Connect to the AP
# Return True if successful, False otherwise
p = subprocess.run(
["nmcli", "dev", "wifi", "connect", ap, "name", ap],
)
if p.returncode != 0:
raise ValueError("Failed to connect to AP")
print("Connected to AP")
ID_PATH = pathlib.Path("light-id")
def get_id():
if not ID_PATH.exists():
ID_PATH.write_text("1")
return int(ID_PATH.read_text()) + 1
def write_id(id):
ID_PATH.write_text(str(id))
def make_req(url):
print("Making request: ", url)
RETRIES = 5
for i in range(RETRIES):
try:
r = requests.get(url, timeout=5)
if not r.ok:
raise ValueError(f"Failed to make request: {url}")
print("Done")
time.sleep(1)
return
except Exception as e:
print(i + 1, "Error making request:", e)
time.sleep(3)
raise ValueError(f"Failed to make request: {url} after {RETRIES} retries")
STATE = {"online": False}
def on_message(client, userdata, message):
print("Message received (IP Address): ", str(message.payload.decode("utf-8")))
parts = message.topic.split("/")
msg = message.payload.decode("utf-8")
if parts[-1] == "ip":
print("Online")
STATE["online"] = True
def send_command(command, value):
mqtt_client.publish(f"cmnd/{device_name}/{command}", value)
time.sleep(1)
def main():
global device_name
print(f"MQTT_HOST: {MQTT_HOST}, SWITCH_TOPIC: {SWITCH_TOPIC}")
dev_id = get_id()
device_name = f"downlight{dev_id}"
print("Device name:", device_name)
# Find the AP to connect to
ap = find_ap()
# Connect to the AP
connect_ap(ap)
# Configure the pins
make_req(
"http://192.168.4.1/cfg_pins?0=0&r0=0&1=0&r1=0&2=0&r2=0&3=0&r3=0&4=0&r4=0&5=0&r5=0&6=0&r6=0&7=23&r7=0&8=24&r8=0&9=0&r9=0&10=0&r10=0&11=0&r11=0&12=0&r12=0&13=0&r13=0&14=0&r14=0&15=0&r15=0&16=0&r16=0&17=0&r17=0&18=0&r18=0&19=0&r19=0&20=0&r20=0&21=0&r21=0&22=0&r22=0&23=0&r23=0&24=0&r24=0&25=0&r25=0&26=0&r26=0&27=0&r27=0&28=0&0=r28"
)
# Configure MQTT
make_req(
f"http://192.168.4.1/cfg_mqtt_set?host=<mqtt host here>&port=1883&client={device_name}&group=dining&user=&password="
)
# Set default startup color
make_req(
"http://192.168.4.1/startup_command?data=backlog+led_basecolor_rgbcw+%230000007777%3B+led_enableAll+1%3B&startup_cmd=1"
)
make_req(
"http://192.168.4.1/cfg_wifi_set?ssid=<your ssid here>&pass=<your wifi password here>"
)
mqtt_client.connect(MQTT_HOST)
mqtt_client.on_message = on_message
mqtt_client.loop_start()
mqtt_client.subscribe(f"{device_name}/ip")
# Turn off the switch
print("Rebooting device...")
mqtt_client.publish(SWITCH_TOPIC, "off")
time.sleep(2)
mqtt_client.publish(SWITCH_TOPIC, "on")
# Wait for the device to connect
print("Waiting for device to connect via MQTT...", end="", flush=True)
while not STATE["online"]:
time.sleep(1)
print(".", end="", flush=True)
print("Connected")
output_color = COLOR_MAP.get(SWITCH_TOPIC[-1], "#2200000000")
# Set the device name
send_command("ShortName", device_name)
send_command("FriendlyName", device_name)
send_command("led_basecolor_rgbcw", output_color)
send_command("power", "1")
write_id(dev_id)
print(f"Flashed device {device_name} successfully")
if __name__ == "__main__":
main()