diff --git a/profile-building/tuya_api_connection.py b/profile-building/tuya_api_connection.py index 4f9efd9..f5efdff 100644 --- a/profile-building/tuya_api_connection.py +++ b/profile-building/tuya_api_connection.py @@ -1,59 +1,66 @@ import base64 import json -import random -import socket -import ssl +import requests import sys -from hashlib import md5, sha256 +import traceback +from hashlib import md5 from urllib.parse import urlparse import Crypto.Util.Padding as padding -import ssl from Crypto.Cipher import AES class TuyaAPIConnection(object): - def __init__(self, uuid: str, auth_key: str, psk: str = None): - self.psk = psk.encode('utf-8') if psk else b'' + def __init__(self, uuid: str, auth_key: str): self.authkey = auth_key.encode('utf-8') self.uuid = uuid.encode('utf-8') def request(self, url: str, params: dict, data: dict, method: str = 'POST') -> dict: parsed_url = urlparse(url) hostname = parsed_url.hostname - psk_wrapped = parsed_url.scheme == "https" - port = parsed_url.port or (443 if psk_wrapped else 80) querystring = self._build_querystring(params) - requestline = f"{parsed_url.path}{querystring}" - # print(requestline) body = self._encrypt_data(data) - # print(data) - http_request = self._build_request(method, hostname, requestline, body) - with self._make_socket(hostname, port, psk_wrapped) as socket: - try: - socket.send(http_request) - datas = socket.recv(10000) - response_body = datas.split(b"\r\n\r\n")[1].decode("utf-8").strip() - # print(response_body) - response_body_json = None - response_body_json = json.loads(response_body) - result = response_body_json["result"] - result = base64.b64decode(result) - result = self._decrypt_data(result) - result = result.decode('utf-8') - result = json.loads(result) - # print(result) - return result - except Exception as exception: - print("[!] Unable to get a response from Tuya API, or response was malformed.") - print(f"[!] {url} must not be blocked in order to pull data from the Tuya API.") - if (response_body_json is not None and 'errorCode' in response_body_json): - print(f"[!] Error message: {response_body_json['errorCode']}") - else: - print(f"[!] Response Body: {response_body}") - print(f"[!] Exception: {exception}") - sys.exit(3) + # print(f"[+] Url: {url + querystring}") + # print(f"[+] Parameters: {params}") + # print(f"[+] Body: {body}") + # print(f"[+] Unencrypted Body: {data}") + + try: + response_body = None + response_body_json = None + user_agent = "TUYA_IOT_SDK" # Older alternate: ESP8266SDK + headers = {"Host":f"{hostname}","User-Agent":f"{user_agent}","Connection":"keep-alive"} + match method.upper(): + case "GET": + # print(f"[+] Headers: {headers}") + response = requests.get(url + querystring, headers=headers) + case "POST": + headers["Content-Type"] = "application/x-www-form-urlencoded; charset=UTF-8" + headers["Content-Length"] = str(len(body)) + # print(f"[+] Headers: {headers}") + response = requests.post(url + querystring, data=body, headers=headers) + response_body = response.text.strip() + # print(response_body) + response_body_json = json.loads(response_body) + result = response_body_json["result"] + result = base64.b64decode(result) + result = self._decrypt_data(result) + result = result.decode('utf-8') + result = json.loads(result) + # print(result) + return result + except Exception as exception: + print(traceback.format_exc()) + print("[!] Unable to get a response from Tuya API, or response was malformed.") + print(f"[!] {url} must not be blocked in order to pull data from the Tuya API.") + if (response_body_json is not None and 'errorCode' in response_body_json): + print(f"[!] Error message: {response_body_json['errorCode']}") + else: + print(f"[!] Response Body: {response_body}") + print(f"[!] Exception: {exception}") + # print(f"[!] Response: {response.text}") + sys.exit(3) def _encrypt_data(self, data: dict): jsondata = json.dumps(data, separators=(",", ":")) @@ -79,48 +86,3 @@ class TuyaAPIConnection(object): query += "&sign=" + signature # print(query) return f"?{query}" - - def _build_request(self, method: str, hostname: str, requestline: str, body: str): - headers = { - "Host": hostname, - "User-Agent": "TUYA_IOT_SDK", - "Connection": "keep-alive" - } - - if body: - headers["Content-Type"] = "application/x-www-form-urlencoded; charset=UTF-8" - headers["Content-Length"] = str(len(body)) - - headers_formatted = "\r\n".join([f"{k}: {v}" for k, v in headers.items()]) - request = f"{method} {requestline} HTTP/1.1\r\n{headers_formatted}\r\n\r\n{body}" - return request.encode("utf-8") - - def _make_socket(self, host: str, port: int, encrypted=True): - csocket = socket.create_connection((host, port)) - if encrypted: - def x(hint): return self._psk_and_pskid(hint) - csocket = ssl.wrap_socket(csocket, ssl_version=ssl.PROTOCOL_TLSv1_2, ciphers='PSK-AES128-CBC-SHA256', psk=x) - return csocket - - def _psk_and_pskid(self, hint): - if not self.psk: - return self._psk_id_v1(hint) - return self._psk_id_v2(hint) - - def _psk_id_v1(self, hint): - authkey_hash = md5(self.authkey).digest() - uuid_hash = md5(self.uuid).digest() - rand_data = random.randbytes(0x10) - init_id = b'\x01' + rand_data + uuid_hash + b'_' + authkey_hash - init_id = init_id.replace(b'\x00', b'?') - iv = md5(init_id[1:]).digest() - key = md5(hint[-16:]).digest() - cipher = AES.new(key, AES.MODE_CBC, iv) - self.psk = cipher.encrypt(init_id[1:33]) - return (self.psk, init_id) - - def _psk_id_v2(self, hint): - uuid_hash = sha256(self.uuid).digest() - rand_data = random.randbytes(0x10) - init_id = b'\x02' + rand_data + uuid_hash - return (self.psk, init_id.replace(b'\x00', b'?'))