Allow passing UUID, AuthKey, and PSKKey to skip the exploit stage.

Rework payload trigger to be delayed after tuya.device.active instead of off of tuya.device.uuid.pskkey.get as that does not get called if psk is present.
Allow tuya.device.upgrade.silent.get to trigger an upgrade if mqtt has not triggered it yet, but disallow duplicate upgrade requests within a short period.
This commit is contained in:
Cossid
2023-11-10 23:57:32 -06:00
parent fc70f9da63
commit a9cc60684c
5 changed files with 148 additions and 62 deletions

View File

@@ -53,30 +53,40 @@ if ! [ -z "${FIRMWARE}" ]; then
echo "Selected Firmware: ${FIRMWARE}"
fi
# Connect to Tuya device's WiFi
echo ""
echo "================================================================================"
echo "Place your device in AP (slow blink) mode. This can usually be accomplished by either:"
echo "Power cycling off/on - 3 times and wait for the device to fast-blink, then repeat 3 more times. Some devices need 4 or 5 times on each side of the pause"
echo "Long press the power/reset button on the device until it starts fast-blinking, then releasing, and then holding the power/reset button again until the device starts slow-blinking."
echo "See https://support.tuya.com/en/help/_detail/K9hut3w10nby8 for more information."
echo "================================================================================"
echo ""
run_helper_script "pre-wifi-exploit"
wifi_connect
if [ ! $? -eq 0 ]; then
echo "Failed to connect, please run this script again"
exit 1
if ! [ -z "${AUTHKEY}" ] && ! [ -z "${UUID}" ] && ! [ -z "${PSKKEY}" ]; then
echo "Using AuthKey ${AUTHKEY} , UUID ${UUID} , and PSKKey ${PSKKEY}"
if ! [ -z "${DEVICEID}" ] && ! [ -z "${LOCALKEY}" ]; then
echo "Using DeviceId ${DEVICEID} and LocalKey ${LOCALKEY}"
fi
echo "Writing deviceconfig file..."
OUTPUT=$(run_in_docker pipenv run python3 -m cloudcutter write_deviceconfig "${PROFILE}" "${VERBOSE_OUTPUT}" --deviceid "${DEVICEID}" --localkey "${LOCALKEY}" --authkey "${AUTHKEY}" --uuid "${UUID}" --pskkey "${PSKKEY}")
else
# Connect to Tuya device's WiFi
echo ""
echo "================================================================================"
echo "Place your device in AP (slow blink) mode. This can usually be accomplished by either:"
echo "Power cycling off/on - 3 times and wait for the device to fast-blink, then repeat 3 more times. Some devices need 4 or 5 times on each side of the pause"
echo "Long press the power/reset button on the device until it starts fast-blinking, then releasing, and then holding the power/reset button again until the device starts slow-blinking."
echo "See https://support.tuya.com/en/help/_detail/K9hut3w10nby8 for more information."
echo "================================================================================"
echo ""
run_helper_script "pre-wifi-exploit"
wifi_connect
if [ ! $? -eq 0 ]; then
echo "Failed to connect, please run this script again"
exit 1
fi
# Exploit chain
echo "Waiting 1 sec to allow device to set itself up..."
sleep 1
echo "Running initial exploit toolchain..."
if ! [ -z "${DEVICEID}" ] && ! [ -z "${LOCALKEY}" ]; then
echo "Using DeviceId ${DEVICEID} and LocalKey ${LOCALKEY}"
fi
OUTPUT=$(run_in_docker pipenv run python3 -m cloudcutter exploit_device "${PROFILE}" "${VERBOSE_OUTPUT}" --deviceid "${DEVICEID}" --localkey "${LOCALKEY}")
fi
# Exploit chain
echo "Waiting 1 sec to allow device to set itself up..."
sleep 1
echo "Running initial exploit toolchain..."
if ! [ -z "${DEVICEID}" ] && ! [ -z "${LOCALKEY}" ]; then
echo "Using ${DEVICEID} and ${LOCALKEY}"
fi
OUTPUT=$(run_in_docker pipenv run python3 -m cloudcutter exploit_device "${PROFILE}" "${VERBOSE_OUTPUT}" --deviceid "${DEVICEID}" --localkey "${LOCALKEY}")
RESULT=$?
echo "${OUTPUT}"
if [ ! $RESULT -eq 0 ]; then
@@ -86,7 +96,6 @@ fi
CONFIG_DIR=$(echo "${OUTPUT}" | grep "output=" | awk -F '=' '{print $2}' | sed -e 's/\r//')
echo "Saved device config in ${CONFIG_DIR}"
# Connect to Tuya device's WiFi again, to make it connect to our hostapd AP later
echo ""
echo "================================================================================"
@@ -106,7 +115,7 @@ fi
# If the AP prefix did not change, the exploit was not successful
# End the process now to end further confusion
if [[ $AP_MATCHED_NAME != A-* ]]; then
if [[ $AP_MATCHED_NAME != A-* ]] && [ -z "${AUTHKEY}" ]; then
echo "================================================================================"
echo "[!] The profile you selected did not result in a successful exploit."
echo "================================================================================"

View File

@@ -1 +0,0 @@
{"t": 1640995200, "success": true}

View File

@@ -19,11 +19,11 @@ import tinytuya.tinytuya as tinytuya
from .crypto.pskcontext import PSKContext
from .device import DEFAULT_AUTH_KEY, DeviceConfig
from .exploit import (build_network_config_packet, exploit_device_with_config,
send_network_config_datagram)
create_device_specific_config, send_network_config_datagram)
from .protocol import handlers, mqtt
from .protocol.transformers import ResponseTransformer
pskkey_endpoint_hook_trigger_time = None
payload_trigger_time = None
def __configure_local_device_response_transformers(config):
@@ -81,10 +81,6 @@ def __trigger_firmware_update(config: DeviceConfig, args):
local_key = config.get(DeviceConfig.LOCAL_KEY)
mqtt.trigger_firmware_update(device_id=device_id, local_key=local_key, protocol="2.2", broker="127.0.0.1", verbose_output=args.verbose_output)
timestamp = ""
if args.verbose_output:
timestamp = str(datetime.now().time()) + " "
print(f"[{timestamp}MQTT Sending] Triggering firmware update message.")
def __configure_local_device_or_update_firmware(args, update_firmware: bool = False):
@@ -97,8 +93,8 @@ def __configure_local_device_or_update_firmware(args, update_firmware: bool = Fa
sys.exit(30)
config = DeviceConfig.read(args.config)
authkey, uuid = config.get_bytes(DeviceConfig.AUTH_KEY, default=DEFAULT_AUTH_KEY), config.get_bytes(DeviceConfig.UUID)
context = PSKContext(authkey=authkey, uuid=uuid)
authkey, uuid, pskkey = config.get_bytes(DeviceConfig.AUTH_KEY, default=DEFAULT_AUTH_KEY), config.get_bytes(DeviceConfig.UUID), config.get_bytes(DeviceConfig.PSK, default=None)
context = PSKContext(authkey=authkey, uuid=uuid, psk=pskkey)
device_id, local_key = config.get(DeviceConfig.DEVICE_ID), config.get(DeviceConfig.LOCAL_KEY)
flash_timeout = 15
if args.flash_timeout is not None:
@@ -109,30 +105,26 @@ def __configure_local_device_or_update_firmware(args, update_firmware: bool = Fa
combined = json.load(f)
device = combined["device"]
def pskkey_endpoint_hook(handler, *_):
"""
Hooks into an endpoint response for the device uuid pskkey get, the apparent last call in standard activation,
and less likely to double-trigger in firmware updates (where dynamic config usually gets called twice).
Standard response should not be overwritten, but needs to register a task
to either change device SSID or update firmware. Hence, return None.
"""
def trigger_payload_endpoint_hook(handler, *_):
if update_firmware:
task_function = __trigger_firmware_update
task_args = (config, args)
else:
task_args = (handler.request.remote_ip, config, args.ssid, args.password)
task_function = __configure_ssid_on_device
global pskkey_endpoint_hook_trigger_time
# Don't allow duplicates in a short period of time, but allow re-triggering if a new connection is made.
if pskkey_endpoint_hook_trigger_time is None or pskkey_endpoint_hook_trigger_time + timedelta(minutes=1) < datetime.now():
pskkey_endpoint_hook_trigger_time = datetime.now()
if update_firmware:
task_function = __trigger_firmware_update
task_args = (config, args)
else:
task_args = (handler.request.remote_ip, config, args.ssid, args.password)
task_function = __configure_ssid_on_device
tornado.ioloop.IOLoop.current().call_later(0, task_function, *task_args)
tornado.ioloop.IOLoop.current().call_later(0, task_function, *task_args)
return None
def upgrade_endpoint_hook(handler, *_):
global payload_trigger_time
# Don't allow duplicates in a short period of time, but allow re-triggering if a new connection is made.
if payload_trigger_time is not None and payload_trigger_time + timedelta(minutes=1) > datetime.now():
print("Discarding duplicate upgrade request to avoid race condition.")
return { "result": { "success": True, "t": int(time.time()) }}
payload_trigger_time = datetime.now()
with open(args.firmware, "rb") as fs:
upgrade_data = fs.read()
sec_key = config.get_bytes(DeviceConfig.SEC_KEY)
@@ -153,7 +145,14 @@ def __configure_local_device_or_update_firmware(args, update_firmware: bool = Fa
}
def active_endpoint_hook(handler, *_):
# active should reset payload trigger time, in case the device reconnected and asked to activate.
global payload_trigger_time
payload_trigger_time = None
schema_id, schema = list(device["schemas"].items())[0]
# Trigger the payload after active has fully registered.
tornado.ioloop.IOLoop.current().call_later(2, trigger_payload_endpoint_hook, *(handler, None))
return {
"result": {
"schema": json.dumps(schema, separators=(',', ':')),
@@ -174,16 +173,12 @@ def __configure_local_device_or_update_firmware(args, update_firmware: bool = Fa
response_transformers = __configure_local_device_response_transformers(config)
endpoint_hooks = {
"tuya.device.active": active_endpoint_hook,
"tuya.device.uuid.pskkey.get": pskkey_endpoint_hook,
}
if update_firmware:
endpoint_hooks.update({
"tuya.device.upgrade.get": upgrade_endpoint_hook,
# Don't hook tuya.device.upgrade.silent.get as an actual upgrade path. There is a schema which will respond with no upgrade instead.
# tuya.device.upgrade.silent.get is reliable/inconsistent, and may interfer with another upgrade already in progress.
# We trigger the non-silent variety by mqtt in a more controlled way.
# "tuya.device.upgrade.silent.get": upgrade_endpoint_hook,
"tuya.device.upgrade.silent.get": upgrade_endpoint_hook,
})
application = tornado.web.Application([
@@ -269,6 +264,29 @@ def __exploit_device(args):
print(f"output={output_path}")
def __write_deviceconfig(args):
output_dir = args.output_directory
if not (os.path.exists(output_dir) and os.path.isdir(output_dir)):
print(f"Provided output directory {output_dir} does not exist or not a directory", file=sys.stderr)
sys.exit(60)
try:
with open(args.profile, "r") as fs:
combined = json.load(fs)
except (OSError, KeyError):
print(f"Could not load profile {args.profile}. Are you sure the profile file exists and is a valid combined JSON?", file=sys.stderr)
sys.exit(65)
device_config = create_device_specific_config(args, combined, args.uuid, args.auth_key, args.psk_key)
output_path = os.path.join(output_dir, f"{args.uuid}.deviceconfig")
device_config.write(output_path)
print("Saved device config.")
# To communicate with external scripts
print(f"output={output_path}")
def __configure_wifi(args):
SSID = args.SSID
password = args.password
@@ -386,6 +404,61 @@ def parse_args():
)
parser_exploit_device.set_defaults(handler=__exploit_device)
parser_write_deviceconfig = subparsers.add_parser(
"write_deviceconfig",
help="Write the deviceconfig to use to for Tuya API emulation."
)
parser_write_deviceconfig.add_argument("profile", help="Device profile JSON file (combined)")
parser_write_deviceconfig.add_argument("verbose_output", help="Flag for more verbose output, 'true' for verbose output", type=bool)
parser_write_deviceconfig.add_argument(
"--output-directory",
dest="output_directory",
required=False,
default="/work/configured-devices",
help="A directory to which the modified device parameters file will be written (default: <workdir>/configured-devices)"
)
parser_write_deviceconfig.add_argument(
"--deviceid",
dest="device_id",
required=False,
default="",
help="deviceid assigned to the device (default: Random)",
type=__validate_localapicredential_arg(20),
)
parser_write_deviceconfig.add_argument(
"--localkey",
dest="local_key",
required=False,
default="",
help="localkey assigned to the device (default: Random)",
type=__validate_localapicredential_arg(16),
)
parser_write_deviceconfig.add_argument(
"--authkey",
dest="auth_key",
required=True,
default="",
help="authkey assigned to the device (default: Random)",
type=__validate_localapicredential_arg(32),
)
parser_write_deviceconfig.add_argument(
"--uuid",
dest="uuid",
required=True,
default="",
help="uuid assigned to the device (default: Random)",
type=__validate_localapicredential_arg(16),
)
parser_write_deviceconfig.add_argument(
"--pskkey",
dest="psk_key",
required=True,
default="",
help="pskkey assigned to the device (default: Random)",
type=__validate_localapicredential_arg(37),
)
parser_write_deviceconfig.set_defaults(handler=__write_deviceconfig)
parser_configure_wifi = subparsers.add_parser(
"configure_wifi",
help="Makes a device to which you're connected via its AP mode join a given WiFi network"

View File

@@ -65,7 +65,7 @@ def generate_random_ascii_string(length):
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
def create_device_specific_config(args, combined, uuid, auth_key) -> DeviceConfig:
def create_device_specific_config(args, combined, uuid, auth_key, psk_key = None) -> DeviceConfig:
config = DeviceConfig({})
config.set(DeviceConfig.UUID, uuid)
@@ -76,9 +76,8 @@ def create_device_specific_config(args, combined, uuid, auth_key) -> DeviceConfi
config.set(DeviceConfig.CHIP_FAMILY, combined["profile"]['firmware']['chip'].upper())
config.set(DeviceConfig.PROFILE, combined["profile"]["name"] + " / " + combined["profile"]["sub_name"])
config.set(DeviceConfig.DEVICE, combined['slug'])
# Currently not used
# config.set(DeviceConfig.PSK, generate_random_ascii_string(PSK_LENGTH))
if psk_key is not None:
config.set(DeviceConfig.PSK, psk_key)
return config

View File

@@ -12,7 +12,7 @@ function getopts-extra () {
done
}
while getopts "hrntvw:p:f:d:l:s::" flag; do
while getopts "hrntvw:p:f:d:l:s::a:k:u:" flag; do
case "$flag" in
r) RESETNM="true";;
n) DISABLE_RESCAN="true";;
@@ -25,6 +25,9 @@ while getopts "hrntvw:p:f:d:l:s::" flag; do
t) FLASH_TIMEOUT=${OPTARG};;
d) DEVICEID=${OPTARG};;
l) LOCALKEY=${OPTARG};;
a) AUTHKEY=${OPTARG};;
k) PSKKEY=${OPTARG};;
u) UUID=${OPTARG};;
s) getopts-extra "$@"
METHOD_DETACH="true"
HAVE_SSID="true"
@@ -40,6 +43,9 @@ while getopts "hrntvw:p:f:d:l:s::" flag; do
echo " -v Verbose log output"
echo " -w TEXT WiFi adapter name (optional, auto-selected if not supplied)"
echo " -p TEXT Device profile name, AKA Device Slug (optional)"
echo " -a TEXT AuthKey of the device (optional, requires UUID and PSKKey accompanied with it)"
echo " -k TEXT PSKKey of the device (optinal, requires AuthKey and UUID accompanied with it)"
echo " -u TEXT UUID of the device (optional, requires AuthKey and PSKKey accompanied with it)"
echo ""
echo "==== Detaching Only: ===="
echo " -s SSID PASSWORD Wifi SSID and Password to use for detaching. Use quotes if either value contains spaces. Certain special characters may need to be escaped with '\\'"