profile-building - tuya_api_connection - Remove unused PSK code, update from socket to http request library

This commit is contained in:
Cossid
2025-01-12 12:18:39 -06:00
parent 32e7a8c3b3
commit 31c55f5cba

View File

@@ -1,59 +1,66 @@
import base64
import json
import random
import socket
import ssl
import requests
import sys
from hashlib import md5, sha256
import traceback
from hashlib import md5
from urllib.parse import urlparse
import Crypto.Util.Padding as padding
import ssl
from Crypto.Cipher import AES
class TuyaAPIConnection(object):
def __init__(self, uuid: str, auth_key: str, psk: str = None):
self.psk = psk.encode('utf-8') if psk else b''
def __init__(self, uuid: str, auth_key: str):
self.authkey = auth_key.encode('utf-8')
self.uuid = uuid.encode('utf-8')
def request(self, url: str, params: dict, data: dict, method: str = 'POST') -> dict:
parsed_url = urlparse(url)
hostname = parsed_url.hostname
psk_wrapped = parsed_url.scheme == "https"
port = parsed_url.port or (443 if psk_wrapped else 80)
querystring = self._build_querystring(params)
requestline = f"{parsed_url.path}{querystring}"
# print(requestline)
body = self._encrypt_data(data)
# print(data)
http_request = self._build_request(method, hostname, requestline, body)
with self._make_socket(hostname, port, psk_wrapped) as socket:
try:
socket.send(http_request)
datas = socket.recv(10000)
response_body = datas.split(b"\r\n\r\n")[1].decode("utf-8").strip()
# print(response_body)
response_body_json = None
response_body_json = json.loads(response_body)
result = response_body_json["result"]
result = base64.b64decode(result)
result = self._decrypt_data(result)
result = result.decode('utf-8')
result = json.loads(result)
# print(result)
return result
except Exception as exception:
print("[!] Unable to get a response from Tuya API, or response was malformed.")
print(f"[!] {url} must not be blocked in order to pull data from the Tuya API.")
if (response_body_json is not None and 'errorCode' in response_body_json):
print(f"[!] Error message: {response_body_json['errorCode']}")
else:
print(f"[!] Response Body: {response_body}")
print(f"[!] Exception: {exception}")
sys.exit(3)
# print(f"[+] Url: {url + querystring}")
# print(f"[+] Parameters: {params}")
# print(f"[+] Body: {body}")
# print(f"[+] Unencrypted Body: {data}")
try:
response_body = None
response_body_json = None
user_agent = "TUYA_IOT_SDK" # Older alternate: ESP8266SDK
headers = {"Host":f"{hostname}","User-Agent":f"{user_agent}","Connection":"keep-alive"}
match method.upper():
case "GET":
# print(f"[+] Headers: {headers}")
response = requests.get(url + querystring, headers=headers)
case "POST":
headers["Content-Type"] = "application/x-www-form-urlencoded; charset=UTF-8"
headers["Content-Length"] = str(len(body))
# print(f"[+] Headers: {headers}")
response = requests.post(url + querystring, data=body, headers=headers)
response_body = response.text.strip()
# print(response_body)
response_body_json = json.loads(response_body)
result = response_body_json["result"]
result = base64.b64decode(result)
result = self._decrypt_data(result)
result = result.decode('utf-8')
result = json.loads(result)
# print(result)
return result
except Exception as exception:
print(traceback.format_exc())
print("[!] Unable to get a response from Tuya API, or response was malformed.")
print(f"[!] {url} must not be blocked in order to pull data from the Tuya API.")
if (response_body_json is not None and 'errorCode' in response_body_json):
print(f"[!] Error message: {response_body_json['errorCode']}")
else:
print(f"[!] Response Body: {response_body}")
print(f"[!] Exception: {exception}")
# print(f"[!] Response: {response.text}")
sys.exit(3)
def _encrypt_data(self, data: dict):
jsondata = json.dumps(data, separators=(",", ":"))
@@ -79,48 +86,3 @@ class TuyaAPIConnection(object):
query += "&sign=" + signature
# print(query)
return f"?{query}"
def _build_request(self, method: str, hostname: str, requestline: str, body: str):
headers = {
"Host": hostname,
"User-Agent": "TUYA_IOT_SDK",
"Connection": "keep-alive"
}
if body:
headers["Content-Type"] = "application/x-www-form-urlencoded; charset=UTF-8"
headers["Content-Length"] = str(len(body))
headers_formatted = "\r\n".join([f"{k}: {v}" for k, v in headers.items()])
request = f"{method} {requestline} HTTP/1.1\r\n{headers_formatted}\r\n\r\n{body}"
return request.encode("utf-8")
def _make_socket(self, host: str, port: int, encrypted=True):
csocket = socket.create_connection((host, port))
if encrypted:
def x(hint): return self._psk_and_pskid(hint)
csocket = ssl.wrap_socket(csocket, ssl_version=ssl.PROTOCOL_TLSv1_2, ciphers='PSK-AES128-CBC-SHA256', psk=x)
return csocket
def _psk_and_pskid(self, hint):
if not self.psk:
return self._psk_id_v1(hint)
return self._psk_id_v2(hint)
def _psk_id_v1(self, hint):
authkey_hash = md5(self.authkey).digest()
uuid_hash = md5(self.uuid).digest()
rand_data = random.randbytes(0x10)
init_id = b'\x01' + rand_data + uuid_hash + b'_' + authkey_hash
init_id = init_id.replace(b'\x00', b'?')
iv = md5(init_id[1:]).digest()
key = md5(hint[-16:]).digest()
cipher = AES.new(key, AES.MODE_CBC, iv)
self.psk = cipher.encrypt(init_id[1:33])
return (self.psk, init_id)
def _psk_id_v2(self, hint):
uuid_hash = sha256(self.uuid).digest()
rand_data = random.randbytes(0x10)
init_id = b'\x02' + rand_data + uuid_hash
return (self.psk, init_id.replace(b'\x00', b'?'))