Initial integration of remote OTA from old proxy code

This commit is contained in:
Khaled Nassar
2022-03-31 19:47:10 +02:00
parent f010590db6
commit 1e42e5e359

View File

@@ -1,8 +1,10 @@
import argparse
import hmac
import json
import os
import sys
import time
from hashlib import sha256
import tinytuya.tinytuya as tinytuya
import tornado.httpserver
@@ -11,7 +13,9 @@ import tornado.web
from .crypto.pskcontext import PSKContext
from .device import DEFAULT_AUTH_KEY, DEVICE_PROFILE_FILE_NAME, DeviceConfig
from .exploit import exploit_device_with_config, build_network_config_packet, send_network_config_datagram
from .exploit import (build_network_config_packet, exploit_device_with_config,
send_network_config_datagram)
from .protocol import mqtt
from .protocol.handlers import DetachHandler, GetURLHandler
from .protocol.transformers import ResponseTransformer
@@ -60,7 +64,16 @@ def __configure_ssid_on_device(ip: str, config: DeviceConfig, ssid: str, passwor
print(f"Exeception: {repr(e)}")
sys.exit(90)
def __configure_local_device(args):
def __trigger_firmware_update(config: DeviceConfig):
device_id = config.get(DeviceConfig.DEVICE_ID)
local_key = config.get(DeviceConfig.LOCAL_KEY)
mqtt.trigger_firmware_update(device_id=device_id, local_key=local_key, protocol="2.2", broker="127.0.0.1")
print("Firmware update messages triggered. Wait for callack from device")
def __configure_local_device_or_update_firmware(args, update_firmare: bool = False):
if not os.path.exists(args.config):
print(f"Configuration file {args.config} does not exist", file=sys.stderr)
sys.exit(10)
@@ -77,19 +90,51 @@ def __configure_local_device(args):
authkey, uuid = config.get_bytes(DeviceConfig.AUTH_KEY, default=DEFAULT_AUTH_KEY), config.get_bytes(DeviceConfig.UUID)
context = PSKContext(authkey=authkey, uuid=uuid)
ssid, password = args.ssid, args.password
def dynamic_config_endpoint_hook(handler, *_):
"""
Hooks into an endpoint response for the dynamic config. Standard response should not be overwritten, but needs to
register a task to change device SSID. Hence, return None.
register a task to either changed device SSID or update firmware. Hence, return None.
"""
tornado.ioloop.IOLoop.current().call_later(5.0, __configure_ssid_on_device, handler.request.remote_ip, config, ssid, password)
if update_firmare:
task_function = __trigger_firmware_update
task_args = (config, )
else:
task_args = (handler.request.remote_ip, config, args.ssid, args.password)
task_function = __configure_ssid_on_device
tornado.ioloop.IOLoop.current().call_later(5.0, task_function, *task_args)
return None
def upgrade_endpoint_hook(handler, *_):
with open(args.firmware, "rb") as fs:
upgrade_data = fs.read()
sec_key = config.get_bytes(DeviceConfig.SEC_KEY)
file_sha = sha256(upgrade_data).hexdigest().upper().encode("utf-8")
file_hmac = hmac.digest(sec_key, file_sha, sha256).hex().upper()
firmware_filename = os.path.basename(args.firmware)
return {
"result": {
"url": f"http://{args.ip}:80/files/{firmware_filename}",
"hmac": file_hmac,
"version": "9.0.0",
"size": str(len(upgrade_data)),
"type": 0,
},
"success": True,
"t": int(time.time())
}
response_transformers = __configure_local_device_response_transformers(config)
endpoint_hooks = {"tuya.device.dynamic.config.get": dynamic_config_endpoint_hook}
if update_firmare:
endpoint_hooks.update({
"tuya.device.upgrade.silent.get": upgrade_endpoint_hook,
"tuya.device.upgrade.get": upgrade_endpoint_hook
})
# TODO: Add a StaticFileHandler under /files/* for firmware update files
application = tornado.web.Application([
(r'/v1/url_config', GetURLHandler, dict(ipaddr=args.ip)),
(r'/v2/url_config', GetURLHandler, dict(ipaddr=args.ip)),
@@ -106,8 +151,7 @@ def __configure_local_device(args):
def __update_firmware(args):
print("Update not implemented yet", file=sys.stderr)
sys.exit(50)
__configure_local_device_or_update_firmware(args, update_firmare=True)
def __exploit_device(args):
@@ -176,7 +220,7 @@ def parse_args():
default="",
help="Password of the SSID for device onboarding (default: empty)",
)
parser_configure.set_defaults(handler=__configure_local_device)
parser_configure.set_defaults(handler=__configure_local_device_or_update_firmware)
parser_update_firmware = subparsers.add_parser("update_firmware", help="Update the device's firmware")
parser_update_firmware.add_argument("config", help="Device configuration file")