mirror of
https://github.com/tuya-cloudcutter/tuya-cloudcutter.git
synced 2026-02-19 21:51:18 +01:00
274 lines
9.6 KiB
Python
274 lines
9.6 KiB
Python
#!/usr/bin/env python3
|
|
|
|
##
|
|
# pull_active_response.py
|
|
# Get response for the tuya.device.active endpoint
|
|
# from server for use in device profiles.
|
|
#
|
|
# Has been tested with light bulbs, and some of
|
|
# the parameters may be off for other devices, but
|
|
# the general concept can be replicated if needed.
|
|
#
|
|
# Requires a valid device uuid, authkey, product
|
|
# key from a firmware dump as well as a valid activation
|
|
# token. Official mobile apps can generate valid tokens
|
|
# which can be sniffed over the network for use.
|
|
#
|
|
##
|
|
import json
|
|
import os
|
|
import socket
|
|
import struct
|
|
import sys
|
|
import threading
|
|
import time
|
|
|
|
from tuya_api_connection import TuyaAPIConnection
|
|
|
|
global multicast_token, cancel_thread
|
|
multicast_token = None
|
|
cancel_thread = False
|
|
|
|
|
|
def print_help():
|
|
print('Usage: python check_upgrade.py --input <uuid> <auth_key> <dev_id> <sec_key> <token>')
|
|
print(' or: python check_upgrade.py --directory <directory> <token>')
|
|
sys.exit(1)
|
|
|
|
|
|
def read_single_line_file(path):
|
|
with open(path, 'r') as file:
|
|
fileContents = file.read()
|
|
if fileContents.__contains__('\n'):
|
|
return None
|
|
return fileContents
|
|
|
|
|
|
def print_and_exit(printText):
|
|
print(printText)
|
|
sys.exit(2)
|
|
|
|
|
|
def build_params(epoch_time, devId):
|
|
params = {
|
|
"a": "tuya.device.upgrade.get",
|
|
"et": 1,
|
|
"t": epoch_time,
|
|
"devId": devId,
|
|
"v": "4.3",
|
|
}
|
|
|
|
return params
|
|
|
|
|
|
def build_data(epoch_time, fwtype = '0'):
|
|
data = {
|
|
'type': fwtype,
|
|
't': epoch_time
|
|
}
|
|
|
|
return data
|
|
|
|
|
|
def get_new_token():
|
|
print('[!] No token provided.')
|
|
print("[!] On any device on the same network as you're device running this script, please log into the Smart Life app ('Try as Guest' works fine if you do not already have an account)")
|
|
print("[!] Note: no real device should currently be in pairing mode, or the instructions below may not match.")
|
|
print("[!] - Start the add device procedure (hit '+' in the upper-right and select 'Add Device')")
|
|
print("[!] - Under 'Add Manually' select 'Socket (Wi-Fi)'")
|
|
print("[!] - Enter your network credentials as instructed")
|
|
print("[!] - Do not follow any of the instructions about putting a device into pairing mode, instead select 'next' until it asks the status of the indicator and select 'Blink Slowly'")
|
|
print("[!] - Select 'Go to Connect', then in your wifi selection screen, hit the back button to return to Smart Life.")
|
|
print("[!] A new token should be sent to your network, and this script will continue.")
|
|
print('[!] Note: this will join an unresponsive device to your account. You can safely delete it afterwards.')
|
|
print('[+] Waiting for multicast token from app...')
|
|
|
|
global multicast_token, cancel_thread
|
|
|
|
try:
|
|
thread = threading.Thread(target=receive_token, args=[])
|
|
thread.start()
|
|
while multicast_token is None:
|
|
time.sleep(0.25)
|
|
pass
|
|
except:
|
|
cancel_thread = True
|
|
print('[!] Cancelled waiting for token.')
|
|
|
|
return multicast_token
|
|
|
|
|
|
def receive_token():
|
|
global multicast_token, cancel_thread
|
|
received_token = False
|
|
while received_token == False and cancel_thread == False:
|
|
s = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
|
|
s.bind(('0.0.0.0', 6669))
|
|
s.settimeout(2)
|
|
try:
|
|
# despite suggestions of being unused, addr must remain present, or this will fail
|
|
msg, addr = s.recvfrom(255)
|
|
(msglen,) = struct.unpack(">I", msg[12:16])
|
|
msg = msg[16: msglen + 8].decode()
|
|
msg = json.loads(msg)
|
|
token = msg["token"]
|
|
received_token = True
|
|
s.close()
|
|
multicast_token = token
|
|
except KeyboardInterrupt:
|
|
return
|
|
except:
|
|
pass
|
|
|
|
|
|
def run(directory: str, output_file_prefix: str, uuid: str, auth_key: str, dev_id: str, sec_key: str, token: str = None):
|
|
if uuid is None or len(uuid) != 16:
|
|
print_and_exit('required uuid was not found or was invalid (expected 16 characters)')
|
|
if auth_key is None or len(auth_key) != 32:
|
|
print_and_exit('required auth_key was not found or was invalid (expected 32 characters)')
|
|
if dev_id is None or len(dev_id) != 22:
|
|
print_and_exit('required dev_id was not found or was invalid (expected 22 characters)')
|
|
if sec_key is None or len(sec_key) != 16:
|
|
print_and_exit('required sec_key was not found or was invalid (expected 16 characters)')
|
|
|
|
if token is None or len(token) != 14:
|
|
token = get_new_token()
|
|
|
|
if token is None:
|
|
print_and_exit('[!] Error receiving new token.')
|
|
|
|
region = token[:2]
|
|
|
|
# Region information found at: https://airtake-public-data.oss-cn-hangzhou.aliyuncs.com/goat/pdf/1582271993811/Tuya%20Smart%20Cloud%20Platform%20Overview_Tuya%20Smart_Docs.pdf
|
|
# AZ American west AWS Oregan Main Machine Room
|
|
# UEAZ American east AZURE Virginia Machine Room
|
|
if region == "AZ" or region == "UE":
|
|
region = "us"
|
|
# EU Europe AWS Frankfurt Machine Room
|
|
elif region == "EU":
|
|
region = "eu"
|
|
# AY Asia Tencent ShangHai Core Machine Room
|
|
elif region == "AY":
|
|
region = "cn"
|
|
# IN Indian AWS Mumbai Machine Room
|
|
elif region == "IN":
|
|
region = "in"
|
|
else:
|
|
print(f"[!] Unable to determine region from token provided (prefix {region})")
|
|
sys.exit(4)
|
|
|
|
reduced_token = token[2:]
|
|
reduced_token = reduced_token[:8]
|
|
assert len(reduced_token) == 8
|
|
print(f'Using token: {token} uuid: {uuid} sec_key: {sec_key}')
|
|
# tuya.device.upgrade.get encrypts with sec_key
|
|
connection = TuyaAPIConnection(uuid, sec_key)
|
|
url = f"http://a.tuya{region}.com/d.json"
|
|
epoch_time = int(time.time())
|
|
params = build_params(epoch_time, dev_id)
|
|
response = None
|
|
requestType = "POST"
|
|
|
|
# Wifi firmware, type 0
|
|
data = build_data(epoch_time, 0)
|
|
response = connection.request(url, params, data, requestType)
|
|
|
|
if response["success"] == True:
|
|
if response.get('result') is not None:
|
|
wifi_version = response['result']['version']
|
|
firmware_wifi_upgrade_url = response['result']['url']
|
|
print("[+] Wifi Firmware update available:")
|
|
print(f"[+] Version: {wifi_version}")
|
|
print(f"[+] Url: {firmware_wifi_upgrade_url}")
|
|
with open(os.path.join(directory, output_file_prefix + f"_firmware_wifi_{wifi_version}.txt"), 'w') as f:
|
|
f.write(firmware_wifi_upgrade_url)
|
|
else:
|
|
print("[+] No Wifi firmware update available.")
|
|
elif response["success"] == False and response["errorCode"] == 'EXPIRE':
|
|
print("[!] The token provided has either expired, or you are connected to the wrong region")
|
|
else:
|
|
print(response)
|
|
|
|
# MCU firmware, type 9
|
|
data = build_data(epoch_time, 9)
|
|
response = connection.request(url, params, data, requestType)
|
|
|
|
if response["success"] == True:
|
|
if response.get('result') is not None:
|
|
mcu_version = response['result']['version']
|
|
firmware_mcu_upgrade_url = response['result']['url']
|
|
print("[+] MCU Firmware update available:")
|
|
print(f"[+] Version: {mcu_version}")
|
|
print(f"[+] Url: {firmware_mcu_upgrade_url}")
|
|
with open(os.path.join(directory, output_file_prefix + f"_firmware_mcu_{mcu_version}.txt"), 'w') as f:
|
|
f.write(firmware_mcu_upgrade_url)
|
|
else:
|
|
print("[+] No MCU firmware update available.")
|
|
elif response["success"] == False and response["errorCode"] == 'EXPIRE':
|
|
print("[!] The token provided has either expired, or you are connected to the wrong region")
|
|
else:
|
|
print(response)
|
|
|
|
def run_input(uuid, auth_key, dev_id, sec_key, token=None):
|
|
run('.\\', 'device', uuid, auth_key, dev_id, sec_key, token)
|
|
|
|
|
|
def run_directory(directory, token=None):
|
|
uuid = None
|
|
auth_key = None
|
|
dev_id = None
|
|
sec_key = None
|
|
output_file_prefix = None
|
|
|
|
dirListing = os.listdir(f'{directory}')
|
|
|
|
for file in dirListing:
|
|
if file.endswith('_uuid.txt'):
|
|
uuid = read_single_line_file(os.path.join(directory, file))
|
|
output_file_prefix = file.replace('_uuid.txt', '')
|
|
elif file.endswith('_auth_key.txt'):
|
|
auth_key = read_single_line_file(os.path.join(directory, file))
|
|
elif file.endswith('_dev_id.txt'):
|
|
dev_id = read_single_line_file(os.path.join(directory, file))
|
|
elif file.endswith('_sec_key.txt'):
|
|
sec_key = read_single_line_file(os.path.join(directory, file))
|
|
|
|
if uuid is None:
|
|
print('[!] uuid was not found')
|
|
return
|
|
if auth_key is None:
|
|
print('[!] auth_key was not found')
|
|
return
|
|
if dev_id is None:
|
|
print('[!] dev_id was not found')
|
|
return
|
|
if sec_key is None:
|
|
print('[!] sec_key was not found')
|
|
return
|
|
|
|
run(directory, output_file_prefix, uuid, auth_key, dev_id, sec_key, token)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
if (sys.argv[2:]):
|
|
if sys.argv[1] == '--input':
|
|
if not sys.argv[7:]:
|
|
print('Unrecognized input.')
|
|
print_help()
|
|
uuid = sys.argv[2]
|
|
auth_key = sys.argv[3]
|
|
dev_id = sys.argv[4]
|
|
sec_key = sys.argv[5]
|
|
token = sys.argv[9]
|
|
run_input(uuid, auth_key, dev_id, sec_key, token)
|
|
elif sys.argv[1] == '--directory':
|
|
if not sys.argv[2:]:
|
|
print('Unrecognized input.')
|
|
print_help()
|
|
directory = sys.argv[2]
|
|
token = (None if len(sys.argv) < 4 else sys.argv[3])
|
|
run_directory(directory, token)
|
|
else:
|
|
print_help()
|