1
0
mirror of https://github.com/mbirth/tcl_ota_check.git synced 2024-11-10 06:16:46 +00:00
tcl_ota_check/tcllib.py

452 lines
18 KiB
Python
Raw Normal View History

2017-09-01 12:43:10 +01:00
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0326
2017-10-10 20:25:38 +01:00
import argparse
2017-09-01 12:43:10 +01:00
import base64
2017-09-01 20:53:21 +01:00
import binascii
2017-10-15 00:51:38 +01:00
import enum
import errno
import glob
2017-09-01 12:43:10 +01:00
import hashlib
import json
import os
import platform
2017-09-01 12:43:10 +01:00
import random
import time
import webbrowser
2017-09-01 12:43:10 +01:00
import xml.dom.minidom
2017-09-01 20:53:21 +01:00
import zlib
2017-09-01 12:43:10 +01:00
from collections import OrderedDict
from math import floor
import numpy
2017-09-01 12:43:10 +01:00
import requests
from defusedxml import ElementTree
2017-09-01 12:43:10 +01:00
DEVICELIST_URL = "https://tclota.birth-online.de/json_lastupdates.php"
DEVICELIST_FILE = "prds.json"
DEVICELIST_CACHE_SECONDS = 86400
2017-09-26 23:39:19 +01:00
ANSI_UP_DEL = u"\u001b[F\u001b[K"
ANSI_BLACK = u"\u001b[0;30m"
ANSI_RED_DARK = u"\u001b[0;31m"
2017-12-19 22:58:26 +00:00
ANSI_GREEN_DARK = u"\u001b[0;32m"
ANSI_YELLOW_DARK = u"\u001b[0;33m"
ANSI_CYAN_DARK = u"\u001b[0;36m"
ANSI_SILVER = u"\u001b[0;37m"
ANSI_GREY = u"\u001b[1;30m"
2017-10-05 16:59:27 +01:00
ANSI_RED = u"\u001b[1;31m"
2017-12-19 22:58:26 +00:00
ANSI_GREEN = u"\u001b[1;32m"
2017-10-05 16:59:27 +01:00
ANSI_YELLOW = u"\u001b[1;33m"
ANSI_CYAN = u"\u001b[1;36m"
ANSI_WHITE = u"\u001b[1;37m"
ANSI_RESET = u"\u001b[0m"
def make_escapes_work():
system = platform.system()
if system == "Windows":
try:
import colorama
except ImportError:
pass
else:
colorama.init()
2017-10-15 00:51:38 +01:00
def default_enum(enumname, vardict):
return enum.IntEnum(enumname, vardict, module=__name__, qualname="tcllib.FotaCheck.{}".format(enumname))
2017-10-10 20:25:38 +01:00
class DefaultParser(argparse.ArgumentParser):
def __init__(self, appname, desc=None):
super().__init__(prog=appname, description=desc, epilog="https://github.com/mbirth/tcl_ota_check")
self.add_argument("--webdb", help="open web database in browser and exit", action="store_true")
def parse_args(self, args=None, namespace=None):
if set(args) & {"--webdb"}: # if they intersect
webbrowser.open("https://tclota.birth-online.de/", new=2)
raise SystemExit
else:
argx = super().parse_args(args, namespace)
return argx
2017-10-10 20:25:38 +01:00
2017-09-01 12:43:10 +01:00
class FotaCheck:
2017-09-01 20:53:21 +01:00
VDKEY = b"eJwdjwEOwDAIAr8kKFr//7HhmqXp8AIIDrYAgg8byiUXrwRJRXja+d6iNxu0AhUooDCN9rd6rDLxmGIakUVWo3IGCTRWqCAt6X4jGEIUAxgN0eYWnp+LkpHQAg/PsO90ELsy0Npm/n2HbtPndFgGEV31R9OmT4O4nrddjc3Qt6nWscx7e+WRHq5UnOudtjw5skuV09pFhvmqnOEIs4ljPeel1wfLYUF4\n"
2017-10-15 00:51:38 +01:00
CKTP = default_enum("CKTP", ["AUTO", "MANUAL"])
MODE = default_enum("MODE", {"OTA": 2, "FULL": 4})
RTD = default_enum("RTD", ["UNROOTED", "ROOTED"])
CHNL = default_enum("CHNL", ["3G", "WIFI"])
CLTP = default_enum("CLTP", {"MOBILE": 10, "DESKTOP": 2010})
CKOT = default_enum("CKOT", ["ALL", "AOTA_ONLY", "FOTA_ONLY"])
2017-09-01 12:43:10 +01:00
def __init__(self):
self.serid = "543212345000000"
self.curef = "PRD-63117-011"
self.fv = "AAM481"
self.osvs = "7.1.1"
2017-10-15 00:51:38 +01:00
self.mode = self.MODE.FULL
2017-09-01 12:43:10 +01:00
self.ftype = "Firmware"
2017-10-15 00:51:38 +01:00
self.cltp = self.CLTP.MOBILE
self.cktp = self.CKTP.MANUAL
self.ckot = self.CKOT.ALL
2017-10-15 00:51:38 +01:00
self.rtd = self.RTD.UNROOTED
self.chnl = self.CHNL.WIFI
self.g2master = None
self.master_servers = [
"g2master-us-east.tclclouds.com",
"g2master-us-west.tclclouds.com",
"g2master-eu-west.tclclouds.com",
"g2master-ap-south.tclclouds.com",
"g2master-ap-north.tclclouds.com",
"g2master-sa-east.tclclouds.com",
]
self.master_servers_weights = [3] * len(self.master_servers)
self.check_time_sum = 3
self.check_time_count = 1
2018-01-17 23:47:27 +00:00
self.last_dump_filename = None
self.reset_session()
def reset_session(self):
2017-09-01 12:43:10 +01:00
self.g2master = self.get_master_server()
self.sess = requests.Session()
2017-10-15 00:51:38 +01:00
if self.mode == self.MODE.FULL:
self.sess.headers.update({"User-Agent": "com.tcl.fota/5.1.0.2.0029.0, Android"})
else:
self.sess.headers.update({"User-Agent": "tcl"})
return self.sess
2017-09-01 12:43:10 +01:00
@staticmethod
def get_salt():
millis = floor(time.time() * 1000)
tail = "{:06d}".format(random.randint(0, 999999))
return "{}{}".format(str(millis), tail)
def get_master_server(self):
weight_sum = 0
for i in self.master_servers_weights:
weight_sum += i
numpy_weights = []
for i in self.master_servers_weights:
numpy_weights.append(i/weight_sum)
return numpy.random.choice(self.master_servers, p=numpy_weights)
2017-09-01 12:43:10 +01:00
def master_server_downvote(self):
idx = self.master_servers.index(self.g2master)
if self.master_servers_weights[idx] > 1:
self.master_servers_weights[idx] -= 1
def master_server_upvote(self):
idx = self.master_servers.index(self.g2master)
if self.master_servers_weights[idx] < 10:
self.master_servers_weights[idx] += 1
def check_time_add(self, duration):
self.check_time_sum += duration
self.check_time_count += 1
def check_time_avg(self):
return (self.check_time_sum / self.check_time_count)
def master_server_vote_on_time(self, last_duration, avg_duration):
if last_duration < avg_duration - 0.5:
self.master_server_upvote()
elif last_duration > avg_duration + 0.5:
self.master_server_downvote()
def write_dump(self, data):
2017-11-02 01:44:46 +00:00
outfile = os.path.normpath("logs/{}.xml".format(self.get_salt()))
if not os.path.exists(os.path.dirname(outfile)):
try:
os.makedirs(os.path.dirname(outfile))
except OSError as e:
if e.errno != errno.EEXIST:
raise
2017-11-08 21:25:31 +00:00
with open(outfile, "w", encoding="utf-8") as f:
f.write(data)
2018-01-17 23:47:27 +00:00
self.last_dump_filename = outfile
def delete_last_dump(self):
if self.last_dump_filename:
os.unlink(self.last_dump_filename)
self.last_dump_filename = None
@staticmethod
def write_info_if_dumps_found():
# To disable this info, uncomment the following line.
#return
files = glob.glob(os.path.normpath("logs/*.xml"))
if len(files) > 0:
print()
print("{}There are {} logs collected in the logs/ directory.{} Please consider uploading".format(ANSI_YELLOW, len(files), ANSI_RESET))
print("them to https://tclota.birth-online.de/ by running {}./upload_logs.py{}.".format(ANSI_CYAN, ANSI_RESET))
def do_check(self, https=True, timeout=10, max_tries=5):
2017-09-26 03:13:25 +01:00
protocol = "https://" if https else "http://"
url = protocol + self.g2master + "/check.php"
2017-09-01 12:43:10 +01:00
params = OrderedDict()
params["id"] = self.serid
params["curef"] = self.curef
params["fv"] = self.fv
2017-10-15 01:15:37 +01:00
params["mode"] = self.mode.value
2017-09-01 12:43:10 +01:00
params["type"] = self.ftype
2017-10-15 01:15:37 +01:00
params["cltp"] = self.cltp.value
params["cktp"] = self.cktp.value
params["rtd"] = self.rtd.value
params["chnl"] = self.chnl.value
2017-09-01 12:43:10 +01:00
#params["osvs"] = self.osvs
#params["ckot"] = self.ckot.value
2017-09-01 12:43:10 +01:00
last_response = None
for num_try in range(0, max_tries):
try:
reqtime_start = time.perf_counter()
req = self.sess.get(url, params=params, timeout=timeout)
reqtime = time.perf_counter() - reqtime_start
reqtime_avg = self.check_time_avg()
self.check_time_add(reqtime)
last_response = req
if req.status_code == 200:
self.master_server_vote_on_time(reqtime, reqtime_avg)
req.encoding = "utf-8" # Force encoding as server doesn't give one
self.write_dump(req.text)
return req.text
elif req.status_code == 204:
self.master_server_vote_on_time(reqtime, reqtime_avg)
raise requests.exceptions.HTTPError("No update available.", response=req)
elif req.status_code == 404:
self.master_server_vote_on_time(reqtime, reqtime_avg)
raise requests.exceptions.HTTPError("No data for requested CUREF/FV combination.", response=req)
elif req.status_code not in [500, 502, 503]:
self.master_server_downvote()
req.raise_for_status()
raise requests.exceptions.HTTPError("HTTP {}.".format(req.status_code), response=req)
except requests.exceptions.Timeout:
pass
# Something went wrong, try a different server
self.master_server_downvote()
self.g2master = self.get_master_server()
protocol = "https://" if https else "http://"
url = protocol + self.g2master + "/check.php"
raise requests.exceptions.RetryError("Max tries ({}) reached.".format(max_tries), response=last_response)
2017-09-01 12:43:10 +01:00
@staticmethod
def pretty_xml(xmlstr):
mdx = xml.dom.minidom.parseString(xmlstr)
return mdx.toprettyxml(indent=" ")
@staticmethod
def parse_check(xmlstr):
root = ElementTree.fromstring(xmlstr)
curef = root.find("CUREF").text
fv = root.find("VERSION").find("FV").text
tv = root.find("VERSION").find("TV").text
fw_id = root.find("FIRMWARE").find("FW_ID").text
fileinfo = root.find("FIRMWARE").find("FILESET").find("FILE")
fileid = fileinfo.find("FILE_ID").text
filename = fileinfo.find("FILENAME").text
filesize = fileinfo.find("SIZE").text
filehash = fileinfo.find("CHECKSUM").text
return curef, fv, tv, fw_id, fileid, filename, filesize, filehash
def get_vk2(self, params_dict, cltp):
params_dict["cltp"] = cltp
query = ""
for k, v in params_dict.items():
if len(query) > 0:
query += "&"
query += k + "=" + str(v)
2017-09-01 20:53:21 +01:00
vdk = zlib.decompress(binascii.a2b_base64(self.VDKEY))
query += vdk.decode("utf-8")
2017-09-01 12:43:10 +01:00
engine = hashlib.sha1()
engine.update(bytes(query, "utf-8"))
hexhash = engine.hexdigest()
return hexhash
@staticmethod
def get_devicelist(force=False, output_diff=True):
need_download = True
old_prds = None
try:
filestat = os.stat(DEVICELIST_FILE)
filemtime = filestat.st_mtime
if filemtime > time.time() - DEVICELIST_CACHE_SECONDS:
need_download = False
with open(DEVICELIST_FILE, "rt") as df:
old_prds = json.load(df)
except FileNotFoundError:
pass
if need_download or force:
prds_json = requests.get(DEVICELIST_URL).text
with open(DEVICELIST_FILE, "wt") as df:
df.write(prds_json)
with open(DEVICELIST_FILE, "rt") as df:
prds = json.load(df)
if old_prds and output_diff:
FotaCheck.print_prd_diff(old_prds, prds)
return prds
@staticmethod
def print_prd_diff(old_prds, new_prds):
added_prds = [prd for prd in new_prds if prd not in old_prds]
removed_prds = [prd for prd in old_prds if prd not in new_prds]
for prd in removed_prds:
print("> Removed device {} (was at {} / OTA: {}).".format(ANSI_RED + prd + ANSI_RESET, old_prds[prd]["last_full"], old_prds[prd]["last_ota"]))
for prd in added_prds:
print("> New device {} ({} / OTA: {}).".format(ANSI_GREEN + prd + ANSI_RESET, new_prds[prd]["last_full"], new_prds[prd]["last_ota"]))
for prd, pdata in new_prds.items():
if prd in added_prds:
continue
odata = old_prds[prd]
if pdata["last_full"] != odata["last_full"] and pdata["last_ota"] != odata["last_ota"]:
print("> {}: {}{} (OTA: {}{})".format(
prd,
ANSI_CYAN_DARK + str(odata["last_full"]) + ANSI_RESET,
ANSI_CYAN + str(pdata["last_full"]) + ANSI_RESET,
ANSI_YELLOW_DARK + str(odata["last_ota"]) + ANSI_RESET,
ANSI_YELLOW + str(pdata["last_ota"]) + ANSI_RESET
))
elif pdata["last_full"] != odata["last_full"]:
print("> {}: {}{} (FULL)".format(prd, ANSI_CYAN_DARK + str(odata["last_full"]) + ANSI_RESET, ANSI_CYAN + str(pdata["last_full"]) + ANSI_RESET))
elif pdata["last_ota"] != odata["last_ota"]:
print("> {}: {}{} (OTA)".format(prd, ANSI_YELLOW_DARK + str(odata["last_ota"]) + ANSI_RESET, ANSI_YELLOW + str(pdata["last_ota"]) + ANSI_RESET))
@staticmethod
def get_creds():
creds = {
b"YWNjb3VudA==": b"emhlbmdodWEuZ2Fv",
b"cGFzc3dvcmQ=": b"cWFydUQ0b2s=",
}
params = {base64.b64decode(key): base64.b64decode(val) for key, val in creds.items()}
return params
@staticmethod
def get_creds2():
creds = {
b"YWNjb3VudA==": b"VGVsZUV4dFRlc3Q=",
b"cGFzc3dvcmQ=": b"dDA1MjM=",
}
params = {base64.b64decode(key): base64.b64decode(val) for key, val in creds.items()}
return params
2017-09-01 12:43:10 +01:00
'''
private HashMap<String, String> buildDownloadUrisParams(UpdatePackageInfo updatePackageInfo) {
FotaLog.m28v(TAG, "doAfterCheck");
String salt = FotaUtil.salt();
HashMap linkedHashMap = new LinkedHashMap();
linkedHashMap.put("id", this.internalBuilder.getParam("id"));
linkedHashMap.put("salt", salt);
linkedHashMap.put("curef", updatePackageInfo.mCuref);
linkedHashMap.put("fv", updatePackageInfo.mFv);
linkedHashMap.put("tv", updatePackageInfo.mTv);
linkedHashMap.put("type", "Firmware");
linkedHashMap.put("fw_id", updatePackageInfo.mFirmwareId);
linkedHashMap.put("mode", "2");
linkedHashMap.put("vk", generateVk2((LinkedHashMap) linkedHashMap.clone()));
linkedHashMap.put("cltp", "10");
linkedHashMap.put("cktp", this.internalBuilder.getParam("cktp"));
linkedHashMap.put("rtd", this.internalBuilder.getParam("rtd"));
linkedHashMap.put("chnl", this.internalBuilder.getParam("chnl"));
return linkedHashMap;
}
'''
def do_request(self, curef, fv, tv, fw_id):
url = "https://" + self.g2master + "/download_request.php"
params = OrderedDict()
params["id"] = self.serid
params["salt"] = self.get_salt()
params["curef"] = curef
params["fv"] = fv
params["tv"] = tv
params["type"] = self.ftype
params["fw_id"] = fw_id
2017-10-15 01:15:37 +01:00
params["mode"] = self.mode.value
params["vk"] = self.get_vk2(params, self.cltp.value)
params["cltp"] = self.cltp.value
params["cktp"] = self.cktp.value
params["rtd"] = self.rtd.value
2017-10-15 00:51:38 +01:00
if self.mode == self.MODE.FULL:
2017-09-01 12:43:10 +01:00
params["foot"] = 1
2017-10-15 01:15:37 +01:00
params["chnl"] = self.chnl.value
2017-09-01 12:43:10 +01:00
#print(repr(dict(params)))
req = self.sess.post(url, data=params)
if req.status_code == 200:
req.encoding = "utf-8" # Force encoding as server doesn't give one
2017-11-02 01:08:22 +00:00
self.write_dump(req.text)
2017-09-01 12:43:10 +01:00
return req.text
else:
print("REQUEST: " + repr(req))
print(repr(req.headers))
print(repr(req.text))
raise SystemExit
@staticmethod
def parse_request(xmlstr):
root = ElementTree.fromstring(xmlstr)
file = root.find("FILE_LIST").find("FILE")
fileid = file.find("FILE_ID").text
fileurl = file.find("DOWNLOAD_URL").text
2018-01-19 10:39:58 +00:00
s3_fileurl_node = file.find("S3_DOWNLOAD_URL")
s3_fileurl = ""
if s3_fileurl_node:
s3_fileurl = s3_fileurl_node.text
2017-09-01 12:43:10 +01:00
slave_list = root.find("SLAVE_LIST").findall("SLAVE")
enc_list = root.find("SLAVE_LIST").findall("ENCRYPT_SLAVE")
s3_slave_list = root.find("SLAVE_LIST").findall("S3_SLAVE")
2017-09-01 12:43:10 +01:00
slaves = [s.text for s in slave_list]
encslaves = [s.text for s in enc_list]
s3_slaves = [s.text for s in s3_slave_list]
return fileid, fileurl, slaves, encslaves, s3_fileurl, s3_slaves
2017-09-01 12:43:10 +01:00
def do_encrypt_header(self, encslave, address):
2018-01-12 01:36:04 +00:00
params = self.get_creds2()
2017-09-01 12:43:10 +01:00
params[b"address"] = bytes(address, "utf-8")
url = "http://" + encslave + "/encrypt_header.php"
2017-09-01 12:43:10 +01:00
req = self.sess.post(url, data=params, verify=False)
# Expect "HTTP 206 Partial Content" response
if req.status_code == 206:
return req.content
2017-09-01 12:43:10 +01:00
else:
print("ENCRYPT: " + repr(req))
print(repr(req.headers))
print(repr(req.text))
raise SystemExit
def do_checksum(self, encslave, address, uri):
url = "http://" + encslave + "/checksum.php"
params = self.get_creds2()
payload = {address: uri}
payload_json = json.dumps(payload)
params[b"address"] = bytes(payload_json, "utf-8")
#print(repr(dict(params)))
req = self.sess.post(url, data=params)
if req.status_code == 200:
req.encoding = "utf-8" # Force encoding as server doesn't give one
self.write_dump(req.text)
# <ENCRYPT_FOOTER>2abfa6f6507044fec995efede5d818e62a0b19b5</ENCRYPT_FOOTER> means ERROR (invalid ADDRESS!)
return req.text
else:
print("CHECKSUM: " + repr(req))
print(repr(req.headers))
print(repr(req.text))
raise SystemExit
2018-01-12 01:36:51 +00:00
@staticmethod
def parse_checksum(xmlstr):
root = ElementTree.fromstring(xmlstr)
file = root.find("FILE_CHECKSUM_LIST").find("FILE")
file_addr = file.find("ADDRESS").text
sha1_enc_footer = file.find("ENCRYPT_FOOTER").text
sha1_footer = file.find("FOOTER").text
sha1_body = file.find("BODY").text
return file_addr, sha1_body, sha1_enc_footer, sha1_footer