mirror of
https://github.com/tuya-cloudcutter/tuya-cloudcutter.git
synced 2026-03-03 11:24:00 +01:00
* Helper script support --------- Co-authored-by: Sillyfrog <sillyfrog@users.noreply.github.com>
220 lines
5.6 KiB
Python
220 lines
5.6 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
This example script configures an RGB downlight that has just been flashed with the
|
|
OpenBK7231N_App firmware.
|
|
|
|
It was used on a Ubuntu 20.04 machine, but should be customized for your own use case.
|
|
Search this file for "here>" to find the places that need to be customized.
|
|
|
|
See scripts/README.md for example usage
|
|
|
|
|
|
You will need to install requests and paho-mqtt:
|
|
pip install requests paho-mqtt
|
|
"""
|
|
import subprocess
|
|
import os
|
|
import time
|
|
import requests
|
|
import pathlib
|
|
|
|
import paho.mqtt.client as mqtt
|
|
|
|
AP_PREFIX = "OpenBK7231N_"
|
|
|
|
MQTT_HOST = os.environ["MQTT_HOST"]
|
|
SWITCH_TOPIC = os.environ["SWITCH_TOPIC"]
|
|
|
|
|
|
mqtt_client = mqtt.Client()
|
|
|
|
|
|
COLOR_MAP = {
|
|
"1": "#2200000000",
|
|
"2": "#0022000000",
|
|
"3": "#0000220000",
|
|
"4": "#2200220000",
|
|
}
|
|
|
|
|
|
def get_wifi_adapter():
|
|
# Get the wifi adapter name
|
|
# Return the adapter name
|
|
p = subprocess.run(
|
|
["nmcli", "dev", "status"],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
)
|
|
for l in p.stdout.splitlines():
|
|
if " wifi " in l:
|
|
return l.split()[0]
|
|
raise ValueError("Failed to find wifi adapter")
|
|
|
|
|
|
def list_aps():
|
|
# List the available APs
|
|
# Return a list of APs
|
|
|
|
subprocess.run(["nmcli", "radio", "wifi", "off"])
|
|
time.sleep(1)
|
|
subprocess.run(["nmcli", "radio", "wifi", "on"])
|
|
time.sleep(1)
|
|
|
|
p = subprocess.run(
|
|
[
|
|
"nmcli",
|
|
"-t",
|
|
"-f",
|
|
"SSID,SECURITY",
|
|
"dev",
|
|
"wifi",
|
|
"list",
|
|
"--rescan",
|
|
"yes",
|
|
],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
)
|
|
open_ssids = []
|
|
for l in p.stdout.splitlines():
|
|
ssid, security = l.split(":")
|
|
if security == "":
|
|
open_ssids.append(ssid)
|
|
return open_ssids
|
|
|
|
|
|
def find_ap():
|
|
# Ensure the interface is in managed mode
|
|
subprocess.run(["nmcli", "device", "set", get_wifi_adapter(), "managed", "yes"])
|
|
while 1:
|
|
print("Scanning for APs...")
|
|
for ap in list_aps():
|
|
if ap.startswith(AP_PREFIX):
|
|
print(f"Found AP: {ap}")
|
|
return ap
|
|
time.sleep(3)
|
|
|
|
|
|
def connect_ap(ap):
|
|
# Connect to the AP
|
|
# Return True if successful, False otherwise
|
|
p = subprocess.run(
|
|
["nmcli", "dev", "wifi", "connect", ap, "name", ap],
|
|
)
|
|
if p.returncode != 0:
|
|
raise ValueError("Failed to connect to AP")
|
|
print("Connected to AP")
|
|
|
|
|
|
ID_PATH = pathlib.Path("light-id")
|
|
|
|
|
|
def get_id():
|
|
if not ID_PATH.exists():
|
|
ID_PATH.write_text("1")
|
|
return int(ID_PATH.read_text()) + 1
|
|
|
|
|
|
def write_id(id):
|
|
ID_PATH.write_text(str(id))
|
|
|
|
|
|
def make_req(url):
|
|
print("Making request: ", url)
|
|
RETRIES = 5
|
|
for i in range(RETRIES):
|
|
try:
|
|
r = requests.get(url, timeout=5)
|
|
if not r.ok:
|
|
raise ValueError(f"Failed to make request: {url}")
|
|
print("Done")
|
|
time.sleep(1)
|
|
return
|
|
except Exception as e:
|
|
print(i + 1, "Error making request:", e)
|
|
time.sleep(3)
|
|
|
|
raise ValueError(f"Failed to make request: {url} after {RETRIES} retries")
|
|
|
|
|
|
STATE = {"online": False}
|
|
|
|
|
|
def on_message(client, userdata, message):
|
|
print("Message received (IP Address): ", str(message.payload.decode("utf-8")))
|
|
parts = message.topic.split("/")
|
|
msg = message.payload.decode("utf-8")
|
|
if parts[-1] == "ip":
|
|
print("Online")
|
|
STATE["online"] = True
|
|
|
|
|
|
def send_command(command, value):
|
|
mqtt_client.publish(f"cmnd/{device_name}/{command}", value)
|
|
time.sleep(1)
|
|
|
|
|
|
def main():
|
|
global device_name
|
|
|
|
print(f"MQTT_HOST: {MQTT_HOST}, SWITCH_TOPIC: {SWITCH_TOPIC}")
|
|
dev_id = get_id()
|
|
device_name = f"downlight{dev_id}"
|
|
print("Device name:", device_name)
|
|
# Find the AP to connect to
|
|
ap = find_ap()
|
|
# Connect to the AP
|
|
connect_ap(ap)
|
|
|
|
# Configure the pins
|
|
make_req(
|
|
"http://192.168.4.1/cfg_pins?0=0&r0=0&1=0&r1=0&2=0&r2=0&3=0&r3=0&4=0&r4=0&5=0&r5=0&6=0&r6=0&7=23&r7=0&8=24&r8=0&9=0&r9=0&10=0&r10=0&11=0&r11=0&12=0&r12=0&13=0&r13=0&14=0&r14=0&15=0&r15=0&16=0&r16=0&17=0&r17=0&18=0&r18=0&19=0&r19=0&20=0&r20=0&21=0&r21=0&22=0&r22=0&23=0&r23=0&24=0&r24=0&25=0&r25=0&26=0&r26=0&27=0&r27=0&28=0&0=r28"
|
|
)
|
|
# Configure MQTT
|
|
make_req(
|
|
f"http://192.168.4.1/cfg_mqtt_set?host=<mqtt host here>&port=1883&client={device_name}&group=dining&user=&password="
|
|
)
|
|
# Set default startup color
|
|
make_req(
|
|
"http://192.168.4.1/startup_command?data=backlog+led_basecolor_rgbcw+%230000007777%3B+led_enableAll+1%3B&startup_cmd=1"
|
|
)
|
|
|
|
make_req(
|
|
"http://192.168.4.1/cfg_wifi_set?ssid=<your ssid here>&pass=<your wifi password here>"
|
|
)
|
|
|
|
mqtt_client.connect(MQTT_HOST)
|
|
mqtt_client.on_message = on_message
|
|
mqtt_client.loop_start()
|
|
mqtt_client.subscribe(f"{device_name}/ip")
|
|
|
|
# Turn off the switch
|
|
print("Rebooting device...")
|
|
mqtt_client.publish(SWITCH_TOPIC, "off")
|
|
time.sleep(2)
|
|
mqtt_client.publish(SWITCH_TOPIC, "on")
|
|
|
|
# Wait for the device to connect
|
|
print("Waiting for device to connect via MQTT...", end="", flush=True)
|
|
while not STATE["online"]:
|
|
time.sleep(1)
|
|
print(".", end="", flush=True)
|
|
print("Connected")
|
|
|
|
output_color = COLOR_MAP.get(SWITCH_TOPIC[-1], "#2200000000")
|
|
|
|
# Set the device name
|
|
send_command("ShortName", device_name)
|
|
send_command("FriendlyName", device_name)
|
|
send_command("led_basecolor_rgbcw", output_color)
|
|
send_command("power", "1")
|
|
|
|
write_id(dev_id)
|
|
print(f"Flashed device {device_name} successfully")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|