mirror of
https://github.com/mbirth/tcl_ota_check.git
synced 2024-11-09 22:06:47 +00:00
Complete re-implementation.
This commit is contained in:
parent
f08ddd44a1
commit
f0a8ea4158
263
tclcheck.py
263
tclcheck.py
@ -1,121 +1,198 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# pylint: disable=C0111,C0326
|
||||
|
||||
import hashlib
|
||||
import random
|
||||
import sys
|
||||
import time
|
||||
import xml.dom.minidom
|
||||
from collections import OrderedDict
|
||||
from math import floor
|
||||
try:
|
||||
from defusedxml import ElementTree
|
||||
except (ImportError, AttributeError):
|
||||
from xml.etree import ElementTree
|
||||
import requests
|
||||
|
||||
class FotaCheck:
|
||||
VDKEY = "1271941121281905392291845155542171963889169361242115412511417616616958244916823523421516924614377131161951402261451161002051042011757216713912611682532031591181861081836612643016596231212872211620511861302106446924625728571011411121471811641125920123641181975581511602312222261817375462445966911723844130106116313122624220514"
|
||||
|
||||
def prep_sess():
|
||||
sess = requests.Session()
|
||||
sess.headers.update({"User-Agent": "com.tcl.fota/5.1.0.2.0029.0, Android"})
|
||||
return sess
|
||||
MODE_OTA = 2
|
||||
MODE_FULL = 4
|
||||
RTD_ROOTED = 2
|
||||
RTD_UNROOTED = 1
|
||||
CHNL_WIFI = 2
|
||||
CHNL_3G = 1
|
||||
|
||||
def __init__(self):
|
||||
self.serid = "543212345000000"
|
||||
self.curef = "PRD-63117-011"
|
||||
self.fv = "AAM481"
|
||||
self.osvs = "7.1.1"
|
||||
self.mode = self.MODE_FULL
|
||||
self.ftype = "Firmware"
|
||||
self.cltp = 10
|
||||
self.cktp = 2
|
||||
self.rtd = self.RTD_UNROOTED
|
||||
self.chnl = self.CHNL_WIFI
|
||||
self.sess = requests.Session()
|
||||
self.g2master = self.get_master_server()
|
||||
#self.req.headers.update({"User-Agent": "com.tcl.fota/5.1.0.2.0029.0, Android"})
|
||||
self.sess.headers.update({"User-Agent": "tcl"})
|
||||
|
||||
def salt():
|
||||
millis = round(time.time() * 1000)
|
||||
tail = "{0:06d}".format(random.randint(0, 999999))
|
||||
return "{0}{1}".format(str(millis), tail)
|
||||
@staticmethod
|
||||
def get_salt():
|
||||
millis = floor(time.time() * 1000)
|
||||
tail = "{:06d}".format(random.randint(0, 999999))
|
||||
return "{}{}".format(str(millis), tail)
|
||||
|
||||
@staticmethod
|
||||
def get_master_server():
|
||||
master_servers = [
|
||||
"g2master-us-east.tclclouds.com",
|
||||
"g2master-us-west.tclclouds.com",
|
||||
"g2master-eu-west.tclclouds.com",
|
||||
"g2master-ap-south.tclclouds.com",
|
||||
"g2master-ap-north.tclclouds.com",
|
||||
"g2master-sa-east.tclclouds.com",
|
||||
]
|
||||
return random.choice(master_servers)
|
||||
|
||||
def check(sess, serid, curef, fv="AAM481", osvs="7.1.1", mode=4, ftype="Firmware", cltp=2010, cktp=2, rtd=1, chnl=2):
|
||||
geturl = "http://g2master-us-east.tctmobile.com/check.php"
|
||||
params = {"id": serid, "curef": curef, "fv": fv, "mode": mode, "type": ftype, "cltp": cltp, "cktp": cktp, "rtd": rtd, "chnl": chnl, "osvs": osvs}
|
||||
req = sess.get(geturl, params=params)
|
||||
if req.status_code == 200:
|
||||
return(req.text)
|
||||
else:
|
||||
print(repr(req))
|
||||
print(repr(req.headers))
|
||||
print(repr(req.text))
|
||||
raise SystemExit
|
||||
def do_check(self):
|
||||
url = "https://" + self.g2master + "/check.php"
|
||||
params = OrderedDict()
|
||||
params["id"] = self.serid
|
||||
params["curef"] = self.curef
|
||||
params["fv"] = self.fv
|
||||
params["mode"] = self.mode
|
||||
params["type"] = self.ftype
|
||||
params["cltp"] = self.cltp
|
||||
params["cktp"] = self.cktp
|
||||
params["rtd"] = self.rtd
|
||||
params["chnl"] = self.chnl
|
||||
|
||||
req = self.sess.get(url, params=params)
|
||||
if req.status_code == 200:
|
||||
return req.text
|
||||
else:
|
||||
print(repr(req))
|
||||
print(repr(req.headers))
|
||||
print(repr(req.text))
|
||||
raise SystemExit
|
||||
|
||||
def update_request(sess, serid, curef, tv, fwid, salt, vkh, fv="AAM481", mode=4, ftype="Firmware", cltp=2010):
|
||||
posturl = "http://g2master-us-east.tctmobile.com/download_request.php"
|
||||
params = {"id": serid, "curef": curef, "fv": fv, "mode": mode, "type": ftype, "tv": tv, "fw_id": fwid, "salt": salt, "vk": vkh, "cltp": cltp}
|
||||
req = sess.post(posturl, data=params)
|
||||
if req.status_code == 200:
|
||||
return req.text
|
||||
else:
|
||||
print(repr(req))
|
||||
print(repr(req.headers))
|
||||
print(repr(req.text))
|
||||
raise SystemExit
|
||||
@staticmethod
|
||||
def pretty_xml(xmlstr):
|
||||
mdx = xml.dom.minidom.parseString(xmlstr)
|
||||
return mdx.toprettyxml(indent=" ")
|
||||
|
||||
@staticmethod
|
||||
def parse_check(xmlstr):
|
||||
root = ElementTree.fromstring(xmlstr)
|
||||
curef = root.find("CUREF").text
|
||||
fv = root.find("VERSION").find("FV").text
|
||||
tv = root.find("VERSION").find("TV").text
|
||||
fwid = root.find("FIRMWARE").find("FW_ID").text
|
||||
fileinfo = root.find("FIRMWARE").find("FILESET").find("FILE")
|
||||
fileid = fileinfo.find("FILE_ID").text
|
||||
filename = fileinfo.find("FILENAME").text
|
||||
filesize = fileinfo.find("SIZE").text
|
||||
filehash = fileinfo.find("CHECKSUM").text
|
||||
return curef, fv, tv, fwid, fileid, filename, filesize, filehash
|
||||
|
||||
def getcode(sess, url):
|
||||
req = sess.head(url)
|
||||
return req.status_code
|
||||
def get_vk2(self, params_dict):
|
||||
params_dict["cltp"] = 10
|
||||
query = ""
|
||||
for k, v in params_dict.items():
|
||||
if len(query) > 0:
|
||||
query += "&"
|
||||
query += k + "=" + str(v)
|
||||
query += self.VDKEY
|
||||
engine = hashlib.sha1()
|
||||
engine.update(bytes(query, "utf-8"))
|
||||
hexhash = engine.hexdigest()
|
||||
return hexhash
|
||||
|
||||
'''
|
||||
private HashMap<String, String> buildDownloadUrisParams(UpdatePackageInfo updatePackageInfo) {
|
||||
FotaLog.m28v(TAG, "doAfterCheck");
|
||||
String salt = FotaUtil.salt();
|
||||
HashMap linkedHashMap = new LinkedHashMap();
|
||||
linkedHashMap.put("id", this.internalBuilder.getParam("id"));
|
||||
linkedHashMap.put("salt", salt);
|
||||
linkedHashMap.put("curef", updatePackageInfo.mCuref);
|
||||
linkedHashMap.put("fv", updatePackageInfo.mFv);
|
||||
linkedHashMap.put("tv", updatePackageInfo.mTv);
|
||||
linkedHashMap.put("type", "Firmware");
|
||||
linkedHashMap.put("fw_id", updatePackageInfo.mFirmwareId);
|
||||
linkedHashMap.put("mode", "2");
|
||||
linkedHashMap.put("vk", generateVk2((LinkedHashMap) linkedHashMap.clone()));
|
||||
linkedHashMap.put("cltp", "10");
|
||||
linkedHashMap.put("cktp", this.internalBuilder.getParam("cktp"));
|
||||
linkedHashMap.put("rtd", this.internalBuilder.getParam("rtd"));
|
||||
linkedHashMap.put("chnl", this.internalBuilder.getParam("chnl"));
|
||||
return linkedHashMap;
|
||||
}
|
||||
'''
|
||||
|
||||
def vkhash(serid, curef, tv, fwid, salt, fv="AAM481", ftype="Firmware", mode=4, cltp=2010):
|
||||
vdkey = "1271941121281905392291845155542171963889169361242115412511417616616958244916823523421516924614377131161951402261451161002051042011757216713912611682532031591181861081836612643016596231212872211620511861302106446924625728571011411121471811641125920123641181975581511602312222261817375462445966911723844130106116313122624220514"
|
||||
query = "id={0}&salt={1}&curef={2}&fv={3}&tv={4}&type={5}&fw_id={6}&mode={7}&cltp={8}{9}".format(serid, salt, curef, fv, tv, ftype, fwid, mode, cltp, vdkey)
|
||||
engine = hashlib.sha1()
|
||||
engine.update(bytes(query, "utf-8"))
|
||||
return engine.hexdigest()
|
||||
def do_request(self, curef, fv, tv, fw_id):
|
||||
url = "https://" + self.g2master + "/download_request.php"
|
||||
params = OrderedDict()
|
||||
params["id"] = self.serid
|
||||
params["salt"] = self.get_salt()
|
||||
params["curef"] = curef
|
||||
params["fv"] = fv
|
||||
params["tv"] = tv
|
||||
params["type"] = self.ftype
|
||||
params["fw_id"] = fw_id
|
||||
params["mode"] = self.mode
|
||||
params["vk"] = self.get_vk2(params)
|
||||
params["cltp"] = self.cltp
|
||||
params["cktp"] = self.cktp
|
||||
params["rtd"] = self.rtd
|
||||
params["chnl"] = self.chnl
|
||||
|
||||
#print(repr(dict(params)))
|
||||
req = self.sess.post(url, data=params)
|
||||
if req.status_code == 200:
|
||||
return req.text
|
||||
else:
|
||||
print(repr(req))
|
||||
print(repr(req.headers))
|
||||
print(repr(req.text))
|
||||
raise SystemExit
|
||||
|
||||
def parse_check(body):
|
||||
root = ElementTree.fromstring(body)
|
||||
tv = root.find("VERSION").find("TV").text
|
||||
fwid = root.find("FIRMWARE").find("FW_ID").text
|
||||
fileinfo = root.find("FIRMWARE").find("FILESET").find("FILE")
|
||||
filename = fileinfo.find("FILENAME").text
|
||||
filesize = fileinfo.find("SIZE").text
|
||||
filehash = fileinfo.find("CHECKSUM").text
|
||||
return tv, fwid, filename, filesize, filehash
|
||||
|
||||
|
||||
def parse_request(body):
|
||||
root = ElementTree.fromstring(body)
|
||||
slave = root.find("SLAVE_LIST").find("SLAVE").text
|
||||
dlurl = root.find("FILE_LIST").find("FILE").find("DOWNLOAD_URL").text
|
||||
return "http://{0}{1}".format(slave, dlurl)
|
||||
|
||||
|
||||
def main2(sess, serid, curef, model="Unknown"):
|
||||
checktext = check(sess, serid, curef)
|
||||
tv, fwid, fn, fs, fh = parse_check(checktext)
|
||||
print("{0}: {1} ({2})".format(curef, tv, model))
|
||||
|
||||
def main(sess, serid, curef):
|
||||
checktext = check(sess, serid, curef)
|
||||
print(repr(checktext))
|
||||
tv, fwid, filename, filesize, filehash = parse_check(checktext)
|
||||
slt = salt()
|
||||
vkh = vkhash(serid, curef, tv, fwid, slt)
|
||||
updatetext = update_request(sess, serid, curef, tv, fwid, slt, vkh)
|
||||
print(repr(updatetext))
|
||||
downloadurl = parse_request(updatetext)
|
||||
print("{0}: HTTP {1}".format(filename, getcode(sess, downloadurl)))
|
||||
print(downloadurl)
|
||||
@staticmethod
|
||||
def parse_request(xmlstr):
|
||||
root = ElementTree.fromstring(xmlstr)
|
||||
file = root.find("FILE_LIST").find("FILE")
|
||||
fileid = file.find("FILE_ID").text
|
||||
fileurl = file.find("DOWNLOAD_URL").text
|
||||
slave_list = root.find("SLAVE_LIST").findall("SLAVE")
|
||||
slaves = []
|
||||
for s in slave_list:
|
||||
slaves.append(s.text)
|
||||
return fileid, fileurl, slaves
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sess = prep_sess()
|
||||
serid = "543212345000000"
|
||||
if len(sys.argv) > 1:
|
||||
if sys.argv[1] == "l":
|
||||
with open("prds.txt", "r") as afile:
|
||||
prdx = afile.read()
|
||||
prds = list(filter(None, prdx.split("\n")))
|
||||
for prdline in prds:
|
||||
prd, model = prdline.split(" ", 1)
|
||||
try:
|
||||
main2(sess, serid, prd, model)
|
||||
except:
|
||||
print("{} ({}) failed.".format(prd, model))
|
||||
continue
|
||||
else:
|
||||
curef = sys.argv[1]
|
||||
main(sess, serid, curef)
|
||||
else:
|
||||
curef = "PRD-63764-001"
|
||||
main(sess, serid, curef)
|
||||
fc = FotaCheck()
|
||||
#fc.serid = "3531510"
|
||||
fc.curef = "PRD-63117-011"
|
||||
fc.fv = "AAM481"
|
||||
fc.osvs = "7.1.1"
|
||||
fc.mode = fc.MODE_OTA
|
||||
fc.cltp = 10
|
||||
fc.cktp = 2
|
||||
|
||||
check_xml = fc.do_check()
|
||||
print(fc.pretty_xml(check_xml))
|
||||
curef, fv, tv, fwid, fileid, fn, fs, fh = fc.parse_check(check_xml)
|
||||
|
||||
req_xml = fc.do_request(curef, fv, tv, fwid)
|
||||
#print(fc.pretty_xml(req_xml))
|
||||
fileid, fileurl, slaves = fc.parse_request(req_xml)
|
||||
|
||||
for s in slaves:
|
||||
print("https://{}{}".format(s, fileurl))
|
||||
|
Loading…
Reference in New Issue
Block a user