From f0a8ea415858fa1f21c58694d3be44c0d6ea590c Mon Sep 17 00:00:00 2001 From: Markus Birth Date: Sat, 19 Aug 2017 00:49:12 +0200 Subject: [PATCH] Complete re-implementation. --- tclcheck.py | 263 +++++++++++++++++++++++++++++++++------------------- 1 file changed, 170 insertions(+), 93 deletions(-) diff --git a/tclcheck.py b/tclcheck.py index d1fe37b..3f74586 100644 --- a/tclcheck.py +++ b/tclcheck.py @@ -1,121 +1,198 @@ #!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +# pylint: disable=C0111,C0326 import hashlib import random import sys import time +import xml.dom.minidom +from collections import OrderedDict +from math import floor try: from defusedxml import ElementTree except (ImportError, AttributeError): from xml.etree import ElementTree import requests +class FotaCheck: + VDKEY = "1271941121281905392291845155542171963889169361242115412511417616616958244916823523421516924614377131161951402261451161002051042011757216713912611682532031591181861081836612643016596231212872211620511861302106446924625728571011411121471811641125920123641181975581511602312222261817375462445966911723844130106116313122624220514" -def prep_sess(): - sess = requests.Session() - sess.headers.update({"User-Agent": "com.tcl.fota/5.1.0.2.0029.0, Android"}) - return sess + MODE_OTA = 2 + MODE_FULL = 4 + RTD_ROOTED = 2 + RTD_UNROOTED = 1 + CHNL_WIFI = 2 + CHNL_3G = 1 + def __init__(self): + self.serid = "543212345000000" + self.curef = "PRD-63117-011" + self.fv = "AAM481" + self.osvs = "7.1.1" + self.mode = self.MODE_FULL + self.ftype = "Firmware" + self.cltp = 10 + self.cktp = 2 + self.rtd = self.RTD_UNROOTED + self.chnl = self.CHNL_WIFI + self.sess = requests.Session() + self.g2master = self.get_master_server() + #self.req.headers.update({"User-Agent": "com.tcl.fota/5.1.0.2.0029.0, Android"}) + self.sess.headers.update({"User-Agent": "tcl"}) -def salt(): - millis = round(time.time() * 1000) - tail = "{0:06d}".format(random.randint(0, 999999)) - return "{0}{1}".format(str(millis), tail) + @staticmethod + def get_salt(): + millis = floor(time.time() * 1000) + tail = "{:06d}".format(random.randint(0, 999999)) + return "{}{}".format(str(millis), tail) + @staticmethod + def get_master_server(): + master_servers = [ + "g2master-us-east.tclclouds.com", + "g2master-us-west.tclclouds.com", + "g2master-eu-west.tclclouds.com", + "g2master-ap-south.tclclouds.com", + "g2master-ap-north.tclclouds.com", + "g2master-sa-east.tclclouds.com", + ] + return random.choice(master_servers) -def check(sess, serid, curef, fv="AAM481", osvs="7.1.1", mode=4, ftype="Firmware", cltp=2010, cktp=2, rtd=1, chnl=2): - geturl = "http://g2master-us-east.tctmobile.com/check.php" - params = {"id": serid, "curef": curef, "fv": fv, "mode": mode, "type": ftype, "cltp": cltp, "cktp": cktp, "rtd": rtd, "chnl": chnl, "osvs": osvs} - req = sess.get(geturl, params=params) - if req.status_code == 200: - return(req.text) - else: - print(repr(req)) - print(repr(req.headers)) - print(repr(req.text)) - raise SystemExit + def do_check(self): + url = "https://" + self.g2master + "/check.php" + params = OrderedDict() + params["id"] = self.serid + params["curef"] = self.curef + params["fv"] = self.fv + params["mode"] = self.mode + params["type"] = self.ftype + params["cltp"] = self.cltp + params["cktp"] = self.cktp + params["rtd"] = self.rtd + params["chnl"] = self.chnl + req = self.sess.get(url, params=params) + if req.status_code == 200: + return req.text + else: + print(repr(req)) + print(repr(req.headers)) + print(repr(req.text)) + raise SystemExit -def update_request(sess, serid, curef, tv, fwid, salt, vkh, fv="AAM481", mode=4, ftype="Firmware", cltp=2010): - posturl = "http://g2master-us-east.tctmobile.com/download_request.php" - params = {"id": serid, "curef": curef, "fv": fv, "mode": mode, "type": ftype, "tv": tv, "fw_id": fwid, "salt": salt, "vk": vkh, "cltp": cltp} - req = sess.post(posturl, data=params) - if req.status_code == 200: - return req.text - else: - print(repr(req)) - print(repr(req.headers)) - print(repr(req.text)) - raise SystemExit + @staticmethod + def pretty_xml(xmlstr): + mdx = xml.dom.minidom.parseString(xmlstr) + return mdx.toprettyxml(indent=" ") + @staticmethod + def parse_check(xmlstr): + root = ElementTree.fromstring(xmlstr) + curef = root.find("CUREF").text + fv = root.find("VERSION").find("FV").text + tv = root.find("VERSION").find("TV").text + fwid = root.find("FIRMWARE").find("FW_ID").text + fileinfo = root.find("FIRMWARE").find("FILESET").find("FILE") + fileid = fileinfo.find("FILE_ID").text + filename = fileinfo.find("FILENAME").text + filesize = fileinfo.find("SIZE").text + filehash = fileinfo.find("CHECKSUM").text + return curef, fv, tv, fwid, fileid, filename, filesize, filehash -def getcode(sess, url): - req = sess.head(url) - return req.status_code + def get_vk2(self, params_dict): + params_dict["cltp"] = 10 + query = "" + for k, v in params_dict.items(): + if len(query) > 0: + query += "&" + query += k + "=" + str(v) + query += self.VDKEY + engine = hashlib.sha1() + engine.update(bytes(query, "utf-8")) + hexhash = engine.hexdigest() + return hexhash + ''' + private HashMap buildDownloadUrisParams(UpdatePackageInfo updatePackageInfo) { + FotaLog.m28v(TAG, "doAfterCheck"); + String salt = FotaUtil.salt(); + HashMap linkedHashMap = new LinkedHashMap(); + linkedHashMap.put("id", this.internalBuilder.getParam("id")); + linkedHashMap.put("salt", salt); + linkedHashMap.put("curef", updatePackageInfo.mCuref); + linkedHashMap.put("fv", updatePackageInfo.mFv); + linkedHashMap.put("tv", updatePackageInfo.mTv); + linkedHashMap.put("type", "Firmware"); + linkedHashMap.put("fw_id", updatePackageInfo.mFirmwareId); + linkedHashMap.put("mode", "2"); + linkedHashMap.put("vk", generateVk2((LinkedHashMap) linkedHashMap.clone())); + linkedHashMap.put("cltp", "10"); + linkedHashMap.put("cktp", this.internalBuilder.getParam("cktp")); + linkedHashMap.put("rtd", this.internalBuilder.getParam("rtd")); + linkedHashMap.put("chnl", this.internalBuilder.getParam("chnl")); + return linkedHashMap; + } + ''' -def vkhash(serid, curef, tv, fwid, salt, fv="AAM481", ftype="Firmware", mode=4, cltp=2010): - vdkey = "1271941121281905392291845155542171963889169361242115412511417616616958244916823523421516924614377131161951402261451161002051042011757216713912611682532031591181861081836612643016596231212872211620511861302106446924625728571011411121471811641125920123641181975581511602312222261817375462445966911723844130106116313122624220514" - query = "id={0}&salt={1}&curef={2}&fv={3}&tv={4}&type={5}&fw_id={6}&mode={7}&cltp={8}{9}".format(serid, salt, curef, fv, tv, ftype, fwid, mode, cltp, vdkey) - engine = hashlib.sha1() - engine.update(bytes(query, "utf-8")) - return engine.hexdigest() + def do_request(self, curef, fv, tv, fw_id): + url = "https://" + self.g2master + "/download_request.php" + params = OrderedDict() + params["id"] = self.serid + params["salt"] = self.get_salt() + params["curef"] = curef + params["fv"] = fv + params["tv"] = tv + params["type"] = self.ftype + params["fw_id"] = fw_id + params["mode"] = self.mode + params["vk"] = self.get_vk2(params) + params["cltp"] = self.cltp + params["cktp"] = self.cktp + params["rtd"] = self.rtd + params["chnl"] = self.chnl + #print(repr(dict(params))) + req = self.sess.post(url, data=params) + if req.status_code == 200: + return req.text + else: + print(repr(req)) + print(repr(req.headers)) + print(repr(req.text)) + raise SystemExit -def parse_check(body): - root = ElementTree.fromstring(body) - tv = root.find("VERSION").find("TV").text - fwid = root.find("FIRMWARE").find("FW_ID").text - fileinfo = root.find("FIRMWARE").find("FILESET").find("FILE") - filename = fileinfo.find("FILENAME").text - filesize = fileinfo.find("SIZE").text - filehash = fileinfo.find("CHECKSUM").text - return tv, fwid, filename, filesize, filehash - - -def parse_request(body): - root = ElementTree.fromstring(body) - slave = root.find("SLAVE_LIST").find("SLAVE").text - dlurl = root.find("FILE_LIST").find("FILE").find("DOWNLOAD_URL").text - return "http://{0}{1}".format(slave, dlurl) - - -def main2(sess, serid, curef, model="Unknown"): - checktext = check(sess, serid, curef) - tv, fwid, fn, fs, fh = parse_check(checktext) - print("{0}: {1} ({2})".format(curef, tv, model)) - -def main(sess, serid, curef): - checktext = check(sess, serid, curef) - print(repr(checktext)) - tv, fwid, filename, filesize, filehash = parse_check(checktext) - slt = salt() - vkh = vkhash(serid, curef, tv, fwid, slt) - updatetext = update_request(sess, serid, curef, tv, fwid, slt, vkh) - print(repr(updatetext)) - downloadurl = parse_request(updatetext) - print("{0}: HTTP {1}".format(filename, getcode(sess, downloadurl))) - print(downloadurl) + @staticmethod + def parse_request(xmlstr): + root = ElementTree.fromstring(xmlstr) + file = root.find("FILE_LIST").find("FILE") + fileid = file.find("FILE_ID").text + fileurl = file.find("DOWNLOAD_URL").text + slave_list = root.find("SLAVE_LIST").findall("SLAVE") + slaves = [] + for s in slave_list: + slaves.append(s.text) + return fileid, fileurl, slaves if __name__ == "__main__": - sess = prep_sess() - serid = "543212345000000" - if len(sys.argv) > 1: - if sys.argv[1] == "l": - with open("prds.txt", "r") as afile: - prdx = afile.read() - prds = list(filter(None, prdx.split("\n"))) - for prdline in prds: - prd, model = prdline.split(" ", 1) - try: - main2(sess, serid, prd, model) - except: - print("{} ({}) failed.".format(prd, model)) - continue - else: - curef = sys.argv[1] - main(sess, serid, curef) - else: - curef = "PRD-63764-001" - main(sess, serid, curef) + fc = FotaCheck() + #fc.serid = "3531510" + fc.curef = "PRD-63117-011" + fc.fv = "AAM481" + fc.osvs = "7.1.1" + fc.mode = fc.MODE_OTA + fc.cltp = 10 + fc.cktp = 2 + + check_xml = fc.do_check() + print(fc.pretty_xml(check_xml)) + curef, fv, tv, fwid, fileid, fn, fs, fh = fc.parse_check(check_xml) + + req_xml = fc.do_request(curef, fv, tv, fwid) + #print(fc.pretty_xml(req_xml)) + fileid, fileurl, slaves = fc.parse_request(req_xml) + + for s in slaves: + print("https://{}{}".format(s, fileurl))