1
0
mirror of https://github.com/mbirth/tcl_ota_check.git synced 2024-11-09 22:06:47 +00:00

Merge branch 'rewrite'

This commit is contained in:
Markus Birth 2018-02-11 01:53:03 +01:00
commit 75d23502a3
Signed by: mbirth
GPG Key ID: A9928D7A098C3A9A
27 changed files with 758 additions and 622 deletions

View File

@ -9,19 +9,18 @@ import os
import random import random
import sys import sys
import tcllib from tcllib import argparser
import tcllib.argparser
from tcllib.devices import Device from tcllib.devices import Device
from tcllib.xmltools import pretty_xml from tcllib.dumpmgr import write_info_if_dumps_found
from tcllib.requests import RequestRunner, CheckRequest, DownloadRequest, \
ChecksumRequest, EncryptHeaderRequest, ServerSelector
fc = tcllib.FotaCheck()
dpdesc = """ dpdesc = """
Checks for the latest FULL updates for the specified PRD number or for an OTA from the Checks for the latest FULL updates for the specified PRD number or for an OTA from the
version specified as fvver. version specified as fvver.
""" """
dp = tcllib.argparser.DefaultParser(__file__, dpdesc) dp = argparser.DefaultParser(__file__, dpdesc)
dp.add_argument("prd", nargs=1, help="CU Reference #, e.g. PRD-63117-011") dp.add_argument("prd", nargs=1, help="CU Reference #, e.g. PRD-63117-011")
dp.add_argument("fvver", nargs="?", help="Firmware version to check for OTA updates, e.g. AAM481 (omit to run FULL check)", default="AAA000") dp.add_argument("fvver", nargs="?", help="Firmware version to check for OTA updates, e.g. AAM481 (omit to run FULL check)", default="AAA000")
dp.add_argument("-i", "--imei", help="use specified IMEI instead of default", type=str) dp.add_argument("-i", "--imei", help="use specified IMEI instead of default", type=str)
@ -68,38 +67,59 @@ else:
print("Mode: {}".format(dev.mode)) print("Mode: {}".format(dev.mode))
print("CLTP: {}".format(dev.cltp)) print("CLTP: {}".format(dev.cltp))
fc.reset_session(dev) runner = RequestRunner(ServerSelector())
check_xml = fc.do_check(dev)
print(pretty_xml(check_xml))
curef, fv, tv, fw_id, fileid, fn, fsize, fhash = fc.parse_check(check_xml)
req_xml = fc.do_request(curef, fv, tv, fw_id) # Check for update
print(pretty_xml(req_xml)) chk = CheckRequest(dev)
fileid, fileurl, slaves, encslaves, s3_fileurl, s3_slaves = fc.parse_request(req_xml) runner.run(chk)
if not chk.success:
print("{}".format(chk.error))
sys.exit(2)
chkres = chk.get_result()
print(chkres.pretty_xml())
if encslaves: # Request download
chksum_xml = fc.do_checksum(random.choice(encslaves), fileurl, fileurl) dlr = DownloadRequest(dev, chkres.tvver, chkres.fw_id)
print(pretty_xml(chksum_xml)) runner.run(dlr)
file_addr, sha1_body, sha1_enc_footer, sha1_footer = fc.parse_checksum(chksum_xml) if not dlr.success:
print("{}".format(dlr.error))
sys.exit(3)
dlrres = dlr.get_result()
print(dlrres.pretty_xml())
for s in slaves: if dlrres.encslaves:
print("http://{}{}".format(s, fileurl)) encrunner = RequestRunner(ServerSelector(dlrres.encslaves), https=False)
cks = ChecksumRequest(dlrres.fileurl, dlrres.fileurl)
encrunner.run(cks)
if not cks.success:
print("{}".format(cks.error))
sys.exit(4)
cksres = cks.get_result()
print(cksres.pretty_xml())
for s in s3_slaves: for s in dlrres.slaves:
print("http://{}{}".format(s, s3_fileurl)) print("http://{}{}".format(s, dlrres.fileurl))
for s in dlrres.s3_slaves:
print("http://{}{}".format(s, dlrres.s3_fileurl))
if dev.mode == dev.MODE_STATES["FULL"]: if dev.mode == dev.MODE_STATES["FULL"]:
header = fc.do_encrypt_header(random.choice(encslaves), fileurl) hdr = EncryptHeaderRequest(dlrres.fileurl)
headname = "header_{}.bin".format(tv) encrunner.run(hdr)
if not hdr.success:
print("{}".format(hdr.error))
sys.exit(5)
hdrres = hdr.get_result()
headname = "header_{}.bin".format(chkres.tvver)
headdir = "headers" headdir = "headers"
if not os.path.exists(headdir): if not os.path.exists(headdir):
os.makedirs(headdir) os.makedirs(headdir)
if len(header) == 4194320: if len(hdrres.rawdata) == 4194320:
# TODO: Check sha1sum # TODO: Check sha1sum
print("Header length check passed. Writing to {}.".format(headname)) print("Header length check passed. Writing to {}.".format(headname))
with open(os.path.join(headdir, headname), "wb") as f: with open(os.path.join(headdir, headname), "wb") as f:
f.write(header) f.write(hdrres.rawdata)
else: else:
print("Header length invalid ({}).".format(len(header))) print("Header length invalid ({}).".format(len(hdrres.rawdata)))
tcllib.FotaCheck.write_info_if_dumps_found() write_info_if_dumps_found()

View File

@ -7,22 +7,19 @@
import sys import sys
from requests.exceptions import RequestException from tcllib import ansi, argparser, devlist
import tcllib
import tcllib.argparser
from tcllib import ansi, devlist
from tcllib.devices import DesktopDevice from tcllib.devices import DesktopDevice
from tcllib.dumpmgr import write_info_if_dumps_found
from tcllib.requests import RequestRunner, CheckRequest, ServerVoteSelector
dev = DesktopDevice() dev = DesktopDevice()
fc = tcllib.FotaCheck()
dpdesc = """ dpdesc = """
Checks for the latest FULL updates for all PRD numbers or only for Checks for the latest FULL updates for all PRD numbers or only for
the PRD specified as prd. the PRD specified as prd.
""" """
dp = tcllib.argparser.DefaultParser(__file__, dpdesc) dp = argparser.DefaultParser(__file__, dpdesc)
dp.add_argument("-p", "--prd", help="CU Reference # to filter scan results", dest="tocheck", nargs="?", default=None, metavar="PRD") dp.add_argument("-p", "--prd", help="CU Reference # to filter scan results", dest="tocheck", nargs="?", default=None, metavar="PRD")
dp.add_argument("-l", "--local", help="Force using local database", dest="local", action="store_true", default=False) dp.add_argument("-l", "--local", help="Force using local database", dest="local", action="store_true", default=False)
args = dp.parse_args(sys.argv[1:]) args = dp.parse_args(sys.argv[1:])
@ -34,27 +31,30 @@ prds = devlist.get_devicelist(local=args.local)
print("List of latest FULL firmware by PRD:") print("List of latest FULL firmware by PRD:")
runner = RequestRunner(ServerVoteSelector())
runner.max_tries = 20
for prd, variant in prds.items(): for prd, variant in prds.items():
model = variant["variant"] model = variant["variant"]
lastver = variant["last_full"] lastver = variant["last_full"]
if prdcheck in prd: if not prdcheck in prd:
try: continue
dev.curef = prd dev.curef = prd
fc.reset_session(dev) chk = CheckRequest(dev)
check_xml = fc.do_check(dev, max_tries=20) runner.run(chk)
curef, fv, tv, fw_id, fileid, fn, fsize, fhash = fc.parse_check(check_xml) if chk.success:
txt_tv = tv result = chk.get_result()
if tv != lastver: txt_tv = result.tvver
txt_tv = "{} (old: {} / OTA: {})".format( if result.tvver != lastver:
ansi.CYAN + txt_tv + ansi.RESET, txt_tv = "{} (old: {} / OTA: {})".format(
ansi.CYAN_DARK + variant["last_full"] + ansi.RESET, ansi.CYAN + txt_tv + ansi.RESET,
variant["last_ota"] ansi.CYAN_DARK + variant["last_full"] + ansi.RESET,
) variant["last_ota"]
else: )
fc.delete_last_dump() else:
print("{}: {} {} ({})".format(prd, txt_tv, fhash, model)) result.delete_dump()
except RequestException as e: print("{}: {} {} ({})".format(prd, txt_tv, result.filehash, model))
print("{}: {}".format(prd, str(e))) else:
continue print("{}: {}".format(prd, str(chk.error)))
tcllib.FotaCheck.write_info_if_dumps_found() write_info_if_dumps_found()

View File

@ -7,22 +7,19 @@
import sys import sys
from requests.exceptions import RequestException from tcllib import ansi, argparser, devlist
import tcllib
import tcllib.argparser
from tcllib import ansi, devlist
from tcllib.devices import MobileDevice from tcllib.devices import MobileDevice
from tcllib.dumpmgr import write_info_if_dumps_found
from tcllib.requests import RequestRunner, CheckRequest, ServerVoteSelector
dev = MobileDevice() dev = MobileDevice()
fc = tcllib.FotaCheck()
dpdesc = """ dpdesc = """
Checks for the latest OTA updates for all PRD numbers or only for the PRD specified Checks for the latest OTA updates for all PRD numbers or only for the PRD specified
as prd. Initial software version can be specified with forcever. as prd. Initial software version can be specified with forcever.
""" """
dp = tcllib.argparser.DefaultParser(__file__, dpdesc) dp = argparser.DefaultParser(__file__, dpdesc)
dp.add_argument("forcever", help="Initial software version to check for OTA updates, e.g. AAM481", nargs="?", default=None) dp.add_argument("forcever", help="Initial software version to check for OTA updates, e.g. AAM481", nargs="?", default=None)
dp.add_argument("-p", "--prd", help="CU Reference # to filter scan results", dest="tocheck", nargs="?", default=None, metavar="PRD") dp.add_argument("-p", "--prd", help="CU Reference # to filter scan results", dest="tocheck", nargs="?", default=None, metavar="PRD")
dp.add_argument("-l", "--local", help="Force using local database", dest="local", action="store_true", default=False) dp.add_argument("-l", "--local", help="Force using local database", dest="local", action="store_true", default=False)
@ -40,23 +37,26 @@ prds = devlist.get_devicelist(local=args.local)
print("List of latest OTA firmware{} by PRD:".format(force_ver_text)) print("List of latest OTA firmware{} by PRD:".format(force_ver_text))
runner = RequestRunner(ServerVoteSelector())
runner.max_tries = 20
for prd, variant in prds.items(): for prd, variant in prds.items():
model = variant["variant"] model = variant["variant"]
lastver = variant["last_ota"] lastver = variant["last_ota"]
lastver = variant["last_full"] if lastver is None else lastver lastver = variant["last_full"] if lastver is None else lastver
if args.forcever is not None: if args.forcever is not None:
lastver = args.forcever lastver = args.forcever
if prdcheck in prd: if not prdcheck in prd:
try: continue
dev.curef = prd dev.curef = prd
dev.fwver = lastver dev.fwver = lastver
fc.reset_session(dev) chk = CheckRequest(dev)
check_xml = fc.do_check(dev, max_tries=20) runner.run(chk)
curef, fv, tv, fw_id, fileid, fn, fsize, fhash = fc.parse_check(check_xml) if chk.success:
versioninfo = ansi.YELLOW_DARK + fv + ansi.RESET + "" + ansi.YELLOW + tv + ansi.RESET + " (FULL: {})".format(variant["last_full"]) result = chk.get_result()
print("{}: {} {} ({})".format(prd, versioninfo, fhash, model)) versioninfo = ansi.YELLOW_DARK + result.fvver + ansi.RESET + "" + ansi.YELLOW + result.tvver + ansi.RESET + " (FULL: {})".format(variant["last_full"])
except RequestException as e: print("{}: {} {} ({})".format(prd, versioninfo, result.filehash, model))
print("{} ({}): {}".format(prd, lastver, str(e))) else:
continue print("{} ({}): {}".format(prd, lastver, chk.error))
tcllib.FotaCheck.write_info_if_dumps_found() write_info_if_dumps_found()

View File

@ -8,22 +8,17 @@
import collections import collections
import sys import sys
from requests.exceptions import RequestException, Timeout from tcllib import ansi, argparser, devlist
from tcllib.devices import DesktopDevice
from tcllib.dumpmgr import write_info_if_dumps_found
from tcllib.requests import RequestRunner, CheckRequest, ServerVoteSelector
import tcllib
import tcllib.argparser
from tcllib import ansi, devlist
from tcllib.devices import DesktopDevice, MobileDevice
dev = DesktopDevice()
fc = tcllib.FotaCheck()
dpdesc = """ dpdesc = """
Finds new PRD numbers for all known variants, or specified variants with tocheck. Scan range Finds new PRD numbers for all known variants, or specified variants with tocheck. Scan range
can be set by floor and ceiling switches. can be set by floor and ceiling switches.
""" """
dp = tcllib.argparser.DefaultParser(__file__, dpdesc) dp = argparser.DefaultParser(__file__, dpdesc)
dp.add_argument("tocheck", help="CU Reference # to filter scan results", nargs="?", default=None) dp.add_argument("tocheck", help="CU Reference # to filter scan results", nargs="?", default=None)
dp.add_argument("-f", "--floor", help="Beginning of scan range", dest="floor", nargs="?", type=int, default=0) dp.add_argument("-f", "--floor", help="Beginning of scan range", dest="floor", nargs="?", type=int, default=0)
dp.add_argument("-c", "--ceiling", help="End of scan range", dest="ceiling", nargs="?", type=int, default=999) dp.add_argument("-c", "--ceiling", help="End of scan range", dest="ceiling", nargs="?", type=int, default=999)
@ -58,6 +53,11 @@ if args.tocheck is not None:
if not prddict: if not prddict:
prddict[args.tocheck] = [] prddict[args.tocheck] = []
dev = DesktopDevice()
runner = RequestRunner(ServerVoteSelector(), https=False)
runner.max_tries = 20
for center in sorted(prddict.keys()): for center in sorted(prddict.keys()):
tails = [int(i) for i in prddict[center]] tails = [int(i) for i in prddict[center]]
safes = [g for g in range(floor, ceiling) if g not in tails] safes = [g for g in range(floor, ceiling) if g not in tails]
@ -69,15 +69,13 @@ for center in sorted(prddict.keys()):
done_count += 1 done_count += 1
print("Checking {} ({}/{})".format(curef, done_count, total_count)) print("Checking {} ({}/{})".format(curef, done_count, total_count))
print(ansi.UP_DEL, end="") print(ansi.UP_DEL, end="")
try: dev.curef = curef
dev.curef = curef chk = CheckRequest(dev)
fc.reset_session(dev) runner.run(chk)
check_xml = fc.do_check(dev, https=False, max_tries=20) if chk.success:
curef, fv, tv, fw_id, fileid, fn, fsize, fhash = fc.parse_check(check_xml) chkres = chk.get_result()
txt_tv = tv txt_tv = chkres.tvver
print("{}: {} {}".format(curef, txt_tv, fhash)) print("{}: {} {}".format(curef, txt_tv, chkres.filehash))
except (SystemExit, RequestException, Timeout) as e:
continue
print("Scan complete.") print("Scan complete.")
tcllib.FotaCheck.write_info_if_dumps_found() write_info_if_dumps_found()

View File

@ -7,25 +7,20 @@
import sys import sys
from requests.exceptions import RequestException, Timeout from tcllib import ansi, argparser, devlist
from tcllib.devices import DesktopDevice
import tcllib from tcllib.dumpmgr import write_info_if_dumps_found
import tcllib.argparser from tcllib.requests import RequestRunner, CheckRequest, ServerVoteSelector
from tcllib import ansi, devlist
from tcllib.devices import DesktopDevice, MobileDevice
# Variants to scan for # Variants to scan for
SCAN_VARIANTS = ["001", "003", "009", "010", "700"] SCAN_VARIANTS = ["001", "003", "009", "010", "700"]
dev = DesktopDevice()
fc = tcllib.FotaCheck()
dpdesc = """ dpdesc = """
Finds new PRD numbers for a range of variants. Scan range can be set by Finds new PRD numbers for a range of variants. Scan range can be set by
floor and ceiling switches. floor and ceiling switches.
""" """
dp = tcllib.argparser.DefaultParser(__file__, dpdesc) dp = argparser.DefaultParser(__file__, dpdesc)
dp.add_argument("floor", nargs="?", help="Model number to start with", type=int, default=63116) dp.add_argument("floor", nargs="?", help="Model number to start with", type=int, default=63116)
dp.add_argument("ceiling", nargs="?", help="Model number to end with", type=int, default=99999) dp.add_argument("ceiling", nargs="?", help="Model number to end with", type=int, default=99999)
dp.add_argument("-l", "--local", help="Force using local database", dest="local", action="store_true", default=False) dp.add_argument("-l", "--local", help="Force using local database", dest="local", action="store_true", default=False)
@ -50,21 +45,24 @@ to_scan = scan_list - known_centers
total_count = len(to_scan) * len(SCAN_VARIANTS) total_count = len(to_scan) * len(SCAN_VARIANTS)
done_count = 0 done_count = 0
dev = DesktopDevice()
runner = RequestRunner(ServerVoteSelector(), https=False)
runner.max_tries = 20
for center in to_scan: for center in to_scan:
for j in SCAN_VARIANTS: for j in SCAN_VARIANTS:
curef = "PRD-{:05}-{:3}".format(center, j) curef = "PRD-{:05}-{:3}".format(center, j)
done_count += 1 done_count += 1
print("Checking {} ({}/{})".format(curef, done_count, total_count)) print("Checking {} ({}/{})".format(curef, done_count, total_count))
print(ansi.UP_DEL, end="") print(ansi.UP_DEL, end="")
try: dev.curef = curef
dev.curef = curef chk = CheckRequest(dev)
fc.reset_session(dev) runner.run(chk)
check_xml = fc.do_check(dev, https=False, max_tries=20) if chk.success:
curef, fv, tv, fw_id, fileid, fn, fsize, fhash = fc.parse_check(check_xml) chkres = chk.get_result()
txt_tv = tv txt_tv = chkres.tvver
print("{}: {} {}".format(curef, txt_tv, fhash)) print("{}: {} {}".format(curef, txt_tv, chkres.filehash))
except (SystemExit, RequestException, Timeout) as e:
continue
print("Scan complete.") print("Scan complete.")
tcllib.FotaCheck.write_info_if_dumps_found() write_info_if_dumps_found()

View File

@ -7,32 +7,28 @@
import sys import sys
from requests.exceptions import RequestException, Timeout from tcllib import ansi, argparser, devlist
from tcllib.devices import MobileDevice
from tcllib.dumpmgr import write_info_if_dumps_found
from tcllib.requests import RequestRunner, CheckRequest, ServerVoteSelector
import tcllib
import tcllib.argparser
from tcllib import ansi
from tcllib.devices import DesktopDevice, MobileDevice
dev = MobileDevice()
fc = tcllib.FotaCheck()
dpdesc = """ dpdesc = """
Finds all valid OTA updates for a given PRD. Scan range can be set by Finds all valid OTA updates for a given PRD. Scan range can be set by
startver and endver switches. startver and endver switches.
""" """
dp = tcllib.argparser.DefaultParser(__file__, dpdesc) dp = argparser.DefaultParser(__file__, dpdesc)
dp.add_argument("prd", help="CU Reference #, e.g. PRD-63117-011") dp.add_argument("prd", help="CU Reference #, e.g. PRD-63117-011")
dp.add_argument("startver", help="Beginning of scan range", nargs="?", default="AAA000") dp.add_argument("startver", help="Beginning of scan range", nargs="?", default="AAA000")
dp.add_argument("endver", help="End of scan range", nargs="?", default="AAZ999") dp.add_argument("endver", help="End of scan range", nargs="?", default="AAZ999")
args = dp.parse_args(sys.argv[1:]) args = dp.parse_args(sys.argv[1:])
fc.curef = args.prd dev = MobileDevice()
dev.curef = args.prd
start_ver = args.startver start_ver = args.startver
end_ver = args.endver end_ver = args.endver
print("Valid firmwares for model {} (between {} and {}):".format(fc.curef, start_ver, end_ver)) print("Valid firmwares for model {} (between {} and {}):".format(dev.curef, start_ver, end_ver))
cur_ver = start_ver cur_ver = start_ver
allvers = [] allvers = []
@ -53,21 +49,22 @@ while True:
break break
cur_ver = "{:3}{:03d}".format("".join(letters), num) cur_ver = "{:3}{:03d}".format("".join(letters), num)
runner = RequestRunner(ServerVoteSelector(), https=False)
runner.max_tries = 20
done_count = 0 done_count = 0
total_count = len(allvers) total_count = len(allvers)
for fv in allvers: for fv in allvers:
done_count += 1 done_count += 1
print("Checking {} ({}/{})".format(fv, done_count, total_count)) print("Checking {} ({}/{})".format(fv, done_count, total_count))
print(ansi.UP_DEL, end="") print(ansi.UP_DEL, end="")
try: dev.fwver = fv
dev.fwver = fv chk = CheckRequest(dev)
fc.reset_session(dev) runner.run(chk)
check_xml = fc.do_check(dev, https=False, max_tries=20) if chk.success:
curef, fv, tv, fw_id, fileid, fn, fsize, fhash = fc.parse_check(check_xml) chkres = chk.get_result()
txt_tv = tv txt_tv = chkres.tvver
print("{}: {}{} {}".format(curef, fv, txt_tv, fhash)) print("{}: {}{} {}".format(dev.curef, fv, txt_tv, chkres.filehash))
except (SystemExit, RequestException, Timeout) as e:
continue
print("Scan complete.") print("Scan complete.")
tcllib.FotaCheck.write_info_if_dumps_found() write_info_if_dumps_found()

View File

@ -6,20 +6,18 @@
"""Query existence of missing OTAs.""" """Query existence of missing OTAs."""
import json import json
import requests import requests
from requests.exceptions import RequestException
import tcllib from tcllib import ansi
from tcllib.devices import DesktopDevice, MobileDevice from tcllib.devices import MobileDevice
from tcllib.dumpmgr import write_info_if_dumps_found
from tcllib.requests import RequestRunner, CheckRequest, ServerVoteSelector
# 1. Fetch list of missing OTAs (e.g. from ancient versions to current) # 1. Fetch list of missing OTAs (e.g. from ancient versions to current)
# 2. Query updates from FOTA servers (and store XML) # 2. Query updates from FOTA servers (and store XML)
# (3. Upload will be done manually with upload_logs.py) # (3. Upload will be done manually with upload_logs.py)
dev = MobileDevice()
fc = tcllib.FotaCheck()
print("Loading list of missing OTAs.") print("Loading list of missing OTAs.")
versions_json = requests.get("https://tclota.birth-online.de/json_otaversions.php").text versions_json = requests.get("https://tclota.birth-online.de/json_otaversions.php").text
@ -30,22 +28,25 @@ for i in versions:
print("Got {} devices and a total of {} missing OTAs.".format(len(versions), num_versions)) print("Got {} devices and a total of {} missing OTAs.".format(len(versions), num_versions))
dev = MobileDevice()
runner = RequestRunner(ServerVoteSelector())
runner.max_tries = 20
num_item = 1 num_item = 1
for prd, data in versions.items(): for prd, data in versions.items():
print("{}:".format(prd), end="", flush=True) print("{}:".format(prd), end="", flush=True)
for ver in data["missing_froms"]: for ver in data["missing_froms"]:
print(" {}".format(ver), end="", flush=True) print(" {}".format(ver), end="", flush=True)
try: dev.curef = prd
dev.curef = prd dev.fwver = ver
dev.fwver = ver chk = CheckRequest(dev)
fc.reset_session(dev) runner.run(chk)
check_xml = fc.do_check(dev, max_tries=20) if chk.success:
curef, fv, tv, fw_id, fileid, fn, fsize, fhash = fc.parse_check(check_xml)
print("", end="", flush=True) print("", end="", flush=True)
except RequestException as e: num_item += 1
else:
print("", end="", flush=True) print("", end="", flush=True)
continue
num_item += 1
print("") print("")
tcllib.FotaCheck.write_info_if_dumps_found() write_info_if_dumps_found()

View File

@ -5,14 +5,10 @@
"""Return checksum for given firmware.""" """Return checksum for given firmware."""
import random
import sys import sys
import tcllib from tcllib import argparser
import tcllib.argparser from tcllib.requests import RequestRunner, ChecksumRequest, ServerSelector
from tcllib.xmltools import pretty_xml
fc = tcllib.FotaCheck()
encslaves = [ encslaves = [
"54.238.56.196", "54.238.56.196",
@ -29,7 +25,7 @@ encslaves = [
dpdesc = """ dpdesc = """
Returns the checksum for a given firmware URI. Returns the checksum for a given firmware URI.
""" """
dp = tcllib.argparser.DefaultParser(__file__, dpdesc) dp = argparser.DefaultParser(__file__, dpdesc)
dp.add_argument("uri", help="URI to firmware, starts with '/body/...'") dp.add_argument("uri", help="URI to firmware, starts with '/body/...'")
args = dp.parse_args(sys.argv[1:]) args = dp.parse_args(sys.argv[1:])
@ -38,6 +34,13 @@ fileurl = args.uri
# /body/ce570ddc079e2744558f191895e524d02a60476f/32/268932 # /body/ce570ddc079e2744558f191895e524d02a60476f/32/268932
#fileurl = "/body/ce570ddc079e2744558f191895e524d02a60476f/2c23717bb747f3c321195419f451de52efa8ea51/263790/268932" #fileurl = "/body/ce570ddc079e2744558f191895e524d02a60476f/2c23717bb747f3c321195419f451de52efa8ea51/263790/268932"
chksum_xml = fc.do_checksum(random.choice(encslaves), fileurl, fileurl) runner = RequestRunner(ServerSelector(encslaves), https=False)
print(pretty_xml(chksum_xml))
file_addr, sha1_body, sha1_enc_footer, sha1_footer = fc.parse_checksum(chksum_xml) cks = ChecksumRequest(fileurl, fileurl)
runner.run(cks)
if not cks.success:
print("{}".format(cks.error))
sys.exit(4)
cksres = cks.get_result()
print(cksres.pretty_xml())

View File

@ -6,22 +6,19 @@
"""Download a given firmware file.""" """Download a given firmware file."""
import os import os
import random
import sys import sys
import tcllib from tcllib import argparser
import tcllib.argparser
from tcllib.devices import DesktopDevice from tcllib.devices import DesktopDevice
from tcllib.xmltools import pretty_xml from tcllib.dumpmgr import write_info_if_dumps_found
from tcllib.requests import RequestRunner, CheckRequest, DownloadRequest, \
ChecksumRequest, EncryptHeaderRequest, ServerSelector
fc = tcllib.FotaCheck()
dev = DesktopDevice()
dpdesc = """ dpdesc = """
Downloads the given firmware file. Downloads the given firmware file.
""" """
dp = tcllib.argparser.DefaultParser(__file__, dpdesc) dp = argparser.DefaultParser(__file__, dpdesc)
dp.add_argument("prd", nargs=1, help="CU Reference #, e.g. PRD-63117-011") dp.add_argument("prd", nargs=1, help="CU Reference #, e.g. PRD-63117-011")
dp.add_argument("targetversion", nargs=1, help="Firmware version to download, e.g. AAN990") dp.add_argument("targetversion", nargs=1, help="Firmware version to download, e.g. AAN990")
dp.add_argument("fwid", nargs=1, help="Numeric firmware file id, e.g. 268932") dp.add_argument("fwid", nargs=1, help="Numeric firmware file id, e.g. 268932")
@ -32,6 +29,7 @@ dp.add_argument("--rawmode", help="override --mode with raw value (2=OTA, 4=FULL
dp.add_argument("--rawcltp", help="override --type with raw value (10=MOBILE, 2010=DESKTOP)", metavar="CLTP") dp.add_argument("--rawcltp", help="override --type with raw value (10=MOBILE, 2010=DESKTOP)", metavar="CLTP")
args = dp.parse_args(sys.argv[1:]) args = dp.parse_args(sys.argv[1:])
dev = DesktopDevice()
def sel_mode(defaultmode, rawval): def sel_mode(defaultmode, rawval):
"""Handle custom mode.""" """Handle custom mode."""
@ -65,30 +63,42 @@ dev.cltp = sel_cltp(args.type, args.rawcltp)
print("Mode: {}".format(dev.mode)) print("Mode: {}".format(dev.mode))
print("CLTP: {}".format(dev.cltp)) print("CLTP: {}".format(dev.cltp))
fv = dev.fwver runner = RequestRunner(ServerSelector())
runner.max_tries = 20
tv = args.targetversion[0] tv = args.targetversion[0]
fw_id = args.fwid[0] fw_id = args.fwid[0]
req_xml = fc.do_request(dev.curef, fv, tv, fw_id) dlr = DownloadRequest(dev, tv, fw_id)
print(pretty_xml(req_xml)) runner.run(dlr)
fileid, fileurl, slaves, encslaves, s3_fileurl, s3_slaves = fc.parse_request(req_xml) if not dlr.success:
print("ERROR: {}".format(dlr.error))
sys.exit(3)
for s in slaves: dlrres = dlr.get_result()
print("http://{}{}".format(s, fileurl)) print(dlrres.pretty_xml())
for s in s3_slaves: for s in dlrres.slaves:
print("http://{}{}".format(s, s3_fileurl)) print("http://{}{}".format(s, dlrres.fileurl))
for s in dlrres.s3_slaves:
print("http://{}{}".format(s, dlrres.s3_fileurl))
if dev.mode == dev.MODE_STATES["FULL"]: if dev.mode == dev.MODE_STATES["FULL"]:
header = fc.do_encrypt_header(random.choice(encslaves), fileurl) encrun = RequestRunner(ServerSelector(dlrres.encslaves), https=False)
headname = "header_{}.bin".format(tv) encrun.max_tries = 20
headdir = "headers" hdr = EncryptHeaderRequest(dlrres.fileurl)
if not os.path.exists(headdir): encrun.run(hdr)
os.makedirs(headdir) if hdr.success:
if len(header) == 4194320: hdrres = hdr.get_result()
print("Header length check passed. Writing to {}.".format(headname)) headname = "header_{}.bin".format(tv)
with open(os.path.join(headdir, headname), "wb") as f: headdir = "headers"
f.write(header) if not os.path.exists(headdir):
else: os.makedirs(headdir)
print("Header length invalid ({}).".format(len(header))) if len(hdrres.rawdata) == 4194320:
print("Header length check passed. Writing to {}.".format(headname))
with open(os.path.join(headdir, headname), "wb") as f:
f.write(hdrres.rawdata)
else:
print("Header length invalid ({}).".format(len(hdrres.rawdata)))
tcllib.FotaCheck.write_info_if_dumps_found() write_info_if_dumps_found()

View File

@ -1,35 +1,9 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# pylint: disable=C0111,C0326,C0103
"""Library for TCL API work and related functions.""" """Library for TCL API work and related functions."""
import requests from .ansi import *
from .argparser import *
from . import (dumpmgr, servervote, tclcheck, tclchecksum, tclencheader, from .devices import *
tclrequest) from .devlist import *
from .dumpmgr import *
class FotaCheck(
tclcheck.TclCheckMixin,
tclrequest.TclRequestMixin,
tclchecksum.TclChecksumMixin,
tclencheader.TclEncHeaderMixin,
servervote.ServerVoteMixin,
dumpmgr.DumpMgrMixin
):
"""Main API handler class."""
def __init__(self):
"""Handle mixins and populate variables."""
super().__init__()
self.reset_session()
def reset_session(self, device=None):
"""Reset everything to default."""
self.g2master = self.get_master_server()
self.sess = requests.Session()
if device:
self.sess.headers.update({"User-Agent": device.ua})
return self.sess

View File

@ -15,23 +15,32 @@ from math import floor
from . import ansi from . import ansi
class DumpMgrMixin: def get_timestamp_random():
"""A mixin component for XML dump management.""" """Generate timestamp + random part to avoid collisions."""
millis = floor(time.time() * 1000)
tail = "{:06d}".format(random.randint(0, 999999))
return "{}_{}".format(str(millis), tail)
def write_info_if_dumps_found():
"""Notify user to upload dumps if present."""
# To disable this info, uncomment the following line.
# return
files = glob.glob(os.path.normpath("logs/*.xml"))
if files:
print()
print("{}There are {} logs collected in the logs/ directory.{} Please consider uploading".format(ansi.YELLOW, len(files), ansi.RESET))
print("them to https://tclota.birth-online.de/ by running {}./upload_logs.py{}.".format(ansi.CYAN, ansi.RESET))
class DumpMgr:
"""A class for XML dump management."""
def __init__(self): def __init__(self):
"""Populate dump file name.""" """Populate dump file name."""
self.last_dump_filename = None self.last_dump_filename = None
@staticmethod
def get_timestamp_random():
"""Generate timestamp + random part to avoid collisions."""
millis = floor(time.time() * 1000)
tail = "{:06d}".format(random.randint(0, 999999))
return "{}_{}".format(str(millis), tail)
def write_dump(self, data): def write_dump(self, data):
"""Write dump to file.""" """Write dump to file."""
outfile = os.path.normpath("logs/{}.xml".format(self.get_timestamp_random())) outfile = os.path.normpath("logs/{}.xml".format(get_timestamp_random()))
if not os.path.exists(os.path.dirname(outfile)): if not os.path.exists(os.path.dirname(outfile)):
try: try:
os.makedirs(os.path.dirname(outfile)) os.makedirs(os.path.dirname(outfile))
@ -47,14 +56,3 @@ class DumpMgrMixin:
if self.last_dump_filename: if self.last_dump_filename:
os.unlink(self.last_dump_filename) os.unlink(self.last_dump_filename)
self.last_dump_filename = None self.last_dump_filename = None
@staticmethod
def write_info_if_dumps_found():
"""Notify user to upload dumps if present."""
# To disable this info, uncomment the following line.
# return
files = glob.glob(os.path.normpath("logs/*.xml"))
if files:
print()
print("{}There are {} logs collected in the logs/ directory.{} Please consider uploading".format(ansi.YELLOW, len(files), ansi.RESET))
print("them to https://tclota.birth-online.de/ by running {}./upload_logs.py{}.".format(ansi.CYAN, ansi.RESET))

View File

@ -0,0 +1,8 @@
# -*- coding: utf-8 -*-
from .checkrequest import CheckRequest
from .downloadrequest import DownloadRequest
from .checksumrequest import ChecksumRequest
from .encryptheaderrequest import EncryptHeaderRequest
from .runner import *
from .serverselector import *

View File

@ -0,0 +1,59 @@
# -*- coding: utf-8 -*-
from collections import OrderedDict
from .. import devices
from .tclrequest import TclRequest
from .tclresult import CheckResult
class CheckRequest(TclRequest):
def __init__(self, device: devices.Device):
super().__init__()
self.uri = "/check.php"
self.method = "GET"
self.device = device
def get_headers(self):
return {"User-Agent": self.device.ua}
def get_params(self):
params = OrderedDict()
params["id"] = self.device.imei
params["curef"] = self.device.curef
params["fv"] = self.device.fwver
params["mode"] = self.device.mode
params["type"] = self.device.type
params["cltp"] = self.device.cltp
params["cktp"] = self.device.cktp
params["rtd"] = self.device.rtd
params["chnl"] = self.device.chnl
#params["osvs"] = self.device.osvs
#params["ckot"] = self.device.ckot
return params
def is_done(self, http_status: int, contents: str) -> bool:
ok_states = {
204: "No update available.",
404: "No data for requested CUREF/FV combination.",
}
if http_status == 200:
self.response = contents
self.result = CheckResult(contents)
self.success = True
return True
elif http_status in ok_states:
self.error = ok_states[http_status]
self.success = False
return True
elif http_status not in [500, 502, 503]:
# Errors OTHER than 500, 502 or 503 are probably
# errors where we don't need to retry
self.error ="HTTP {}.".format(http_status)
self.success = False
return True
return False
# Check requests have 4 possible outcomes:
# 1. HTTP 200 with XML data - our desired info
# 2. HTTP 204 - means: no newer update available
# 3. HTTP 404 - means: invalid device or firmware version
# 4. anything else: server problem (esp. 500, 502, 503)

View File

@ -0,0 +1,42 @@
# -*- coding: utf-8 -*-
from collections import OrderedDict
import json
from .. import credentials, devices
from .tclrequest import TclRequest
from .tclresult import ChecksumResult
class ChecksumRequest(TclRequest):
def __init__(self, address, file_uri):
super().__init__()
# NOTE: THIS HAS TO BE RUN ON AN ENCSLAVE
self.uri = "/checksum.php"
self.method = "POST"
self.address = address
self.file_uri = file_uri
def get_headers(self):
return {"User-Agent": "tcl"}
def get_params(self):
params = OrderedDict()
params.update(credentials.get_creds2())
payload = {self.address: self.file_uri}
payload_json = json.dumps(payload)
params["address"] = bytes(payload_json, "utf-8")
return params
def is_done(self, http_status: int, contents: str) -> bool:
if http_status == 200:
# <ENCRYPT_FOOTER>2abfa6f6507044fec995efede5d818e62a0b19b5</ENCRYPT_FOOTER> means ERROR (invalid ADDRESS!)
if "<ENCRYPT_FOOTER>2abfa6f6507044fec995efede5d818e62a0b19b5</ENCRYPT_FOOTER>" in contents:
self.error = "INVALID URI: {}".format(self.file_uri)
self.success = False
return True
self.response = contents
self.result = ChecksumResult(contents)
self.success = True
return True
self.error = "HTTP {}".format(http_status)
self.success = False
return True

View File

@ -0,0 +1,82 @@
# -*- coding: utf-8 -*-
import binascii
import hashlib
import random
import time
import zlib
from collections import OrderedDict
from math import floor
from .. import devices
from .tclrequest import TclRequest
from .tclresult import DownloadResult
VDKEY_B64Z = b"eJwdjwEOwDAIAr8kKFr//7HhmqXp8AIIDrYAgg8byiUXrwRJRXja+d6iNxu0AhUooDCN9rd6rDLxmGIakUVWo3IGCTRWqCAt6X4jGEIUAxgN0eYWnp+LkpHQAg/PsO90ELsy0Npm/n2HbtPndFgGEV31R9OmT4O4nrddjc3Qt6nWscx7e+WRHq5UnOudtjw5skuV09pFhvmqnOEIs4ljPeel1wfLYUF4\n"
def get_salt():
"""Generate cryptographic salt."""
millis = floor(time.time() * 1000)
tail = "{:06d}".format(random.randint(0, 999999))
return "{}{}".format(str(millis), tail)
def get_vk2(params_dict, cltp):
"""Generate salted hash of API parameters."""
params_dict["cltp"] = cltp
query = ""
for key, val in params_dict.items():
if query:
query += "&"
query += key + "=" + str(val)
vdk = zlib.decompress(binascii.a2b_base64(VDKEY_B64Z))
query += vdk.decode("utf-8")
engine = hashlib.sha1()
engine.update(bytes(query, "utf-8"))
hexhash = engine.hexdigest()
return hexhash
class DownloadRequest(TclRequest):
def __init__(self, device: devices.Device, tvver: str, fw_id: str):
super().__init__()
self.uri = "/download_request.php"
self.method = "POST"
self.device = device
self.tvver = tvver
self.fw_id = fw_id
def get_headers(self):
return {"User-Agent": self.device.ua}
def get_params(self):
params = OrderedDict()
params["id"] = self.device.imei
params["salt"] = get_salt()
params["curef"] = self.device.curef
params["fv"] = self.device.fwver
params["tv"] = self.tvver
params["type"] = self.device.type
params["fw_id"] = self.fw_id
params["mode"] = self.device.mode
params["vk"] = get_vk2(params, self.device.cltp)
params["cltp"] = self.device.cltp
params["cktp"] = self.device.cktp
params["rtd"] = self.device.rtd
if self.device.mode == self.device.MODE_STATES["FULL"]:
params["foot"] = 1
params["chnl"] = self.device.chnl
return params
def is_done(self, http_status: int, contents: str) -> bool:
if http_status == 200:
self.response = contents
self.result = DownloadResult(contents)
self.success = True
return True
elif http_status not in [500, 502, 503]:
# Errors OTHER than 500, 502 or 503 are probably
# errors where we don't need to retry
self.error = "HTTP {}".format(http_status)
self.success = False
return True
return False

View File

@ -0,0 +1,34 @@
# -*- coding: utf-8 -*-
from collections import OrderedDict
from .. import credentials, devices
from .tclrequest import TclRequest
from .tclresult import EncryptHeaderResult
class EncryptHeaderRequest(TclRequest):
def __init__(self, file_uri):
super().__init__()
# NOTE: THIS HAS TO BE RUN ON AN ENCSLAVE
self.uri = "/encrypt_header.php"
self.rawmode = True
self.method = "POST"
self.file_uri = file_uri
def get_headers(self):
return {"User-Agent": "tcl"}
def get_params(self):
params = OrderedDict()
params.update(credentials.get_creds2())
params["address"] = bytes(self.file_uri, "utf-8")
return params
def is_done(self, http_status: int, contents: str) -> bool:
# Expect "HTTP 206 Partial Content" response
if http_status == 206:
self.result = EncryptHeaderResult(contents)
self.success = True
return True
self.error = "HTTP {}".format(http_status)
self.success = False
return True

38
tcllib/requests/http.py Normal file
View File

@ -0,0 +1,38 @@
# -*- coding: utf-8 -*-
import requests
from collections import OrderedDict
class TimeoutException(Exception):
pass
class HttpRequest:
"""Provides all generic features for making HTTP GET requests"""
def __init__(self, url, timeout=10):
self.url = url
self.params = OrderedDict()
self.timeout = timeout
self.headers = {}
def reset_session(self):
"""Reset everything to default."""
self.sess = requests.Session()
self.sess.headers.update(self.headers)
def run(self):
"""Run query."""
try:
req = self.sess.get(self.url, params=self.params, timeout=self.timeout)
except requests.exceptions.Timeout as e:
raise TimeoutException(e)
return req
class HttpPostRequest(HttpRequest):
"""Provides all generic features for making HTTP POST requests"""
def run(self):
"""Run query."""
try:
req = self.sess.post(self.url, data=self.params, timeout=self.timeout)
except requests.exceptions.Timeout as e:
raise TimeoutException(e)
return req

51
tcllib/requests/runner.py Normal file
View File

@ -0,0 +1,51 @@
# -*- coding: utf-8 -*-
from .tclrequest import TclRequest
from . import http
from . import serverselector
class UnknownMethodException(Exception):
pass
class RequestRunner:
def __init__(self, server_selector: serverselector.ServerSelector, https=True):
self.server_selector = server_selector
self.protocol = "https://" if https else "http://"
self.max_tries = 5
def get_http(self, method="GET") -> http.HttpRequest:
"""Returns the http class according to desired method."""
if method == "GET":
return http.HttpRequest
elif method == "POST":
return http.HttpPostRequest
raise UnknownMethodException("Unknown http method: {}".format(method))
def get_server(self) -> str:
"""Returns a master server."""
return self.server_selector.get_master_server()
def run(self, query: TclRequest, timeout: int=10) -> bool:
"""Runs the actual query."""
for _ in range(0, self.max_tries):
url = "{}{}{}".format(self.protocol, self.get_server(), query.uri)
http_handler = self.get_http(query.method)(url, timeout)
http_handler.headers = query.get_headers()
http_handler.params = query.get_params()
http_handler.reset_session()
self.server_selector.hook_prerequest()
try:
req = http_handler.run()
if query.rawmode:
done = query.is_done(req.status_code, req.content)
else:
req.encoding = "utf-8"
done = query.is_done(req.status_code, req.text)
self.server_selector.hook_postrequest(done)
if done:
return done
except http.TimeoutException:
self.server_selector.hook_postrequest(False)
query.error = "Timeout."
query.error = "Max tries ({}) reached.".format(self.max_tries)
return False

View File

@ -0,0 +1,109 @@
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0326,C0103
"""Tools to sort API servers to find the least awful one."""
import numpy
import time
MASTER_SERVERS = [
"g2master-us-east.tclclouds.com",
"g2master-us-west.tclclouds.com",
"g2master-eu-west.tclclouds.com",
"g2master-ap-south.tclclouds.com",
"g2master-ap-north.tclclouds.com",
"g2master-sa-east.tclclouds.com",
]
class ServerSelector:
"""Returns a random server to use."""
def __init__(self, server_list=None):
"""Init stuff"""
if server_list:
self.server_list = server_list
else:
self.server_list = MASTER_SERVERS
self.last_server = None
def get_master_server(self):
"""Return a random server."""
while True:
new_server = numpy.random.choice(self.server_list)
if new_server != self.last_server:
break
self.last_server = new_server
return new_server
def hook_prerequest(self):
"""Hook to be called before doing request"""
pass
def hook_postrequest(self, successful: bool):
"""Hook to be called after request finished"""
pass
class ServerVoteSelector(ServerSelector):
"""Tries to return faster servers more often."""
def __init__(self, server_list=None):
"""Populate server list and weighting variables."""
super().__init__(server_list)
self.servers_weights = [3] * len(self.server_list)
self.check_time_sum = 3
self.check_time_count = 1
def get_master_server(self):
"""Return weighted choice from server list."""
weight_sum = 0
for i in self.servers_weights:
weight_sum += i
numpy_weights = []
for i in self.servers_weights:
numpy_weights.append(i/weight_sum)
self.last_server = numpy.random.choice(self.server_list, p=numpy_weights)
return self.last_server
def master_server_downvote(self):
"""Decrease weight of last chosen server."""
idx = self.server_list.index(self.last_server)
if self.servers_weights[idx] > 1:
self.servers_weights[idx] -= 1
def master_server_upvote(self):
"""Increase weight of last chosen server."""
idx = self.server_list.index(self.last_server)
if self.servers_weights[idx] < 10:
self.servers_weights[idx] += 1
def check_time_add(self, duration):
"""Record connection time."""
self.check_time_sum += duration
self.check_time_count += 1
def check_time_avg(self):
"""Return average connection time."""
return self.check_time_sum / self.check_time_count
def master_server_vote_on_time(self, last_duration):
"""Change weight of a server based on average connection time."""
avg_duration = self.check_time_avg()
if last_duration < avg_duration - 0.5:
self.master_server_upvote()
elif last_duration > avg_duration + 0.5:
self.master_server_downvote()
def hook_prerequest(self):
"""Hook to be called before doing request"""
self.reqtime_start = time.perf_counter()
def hook_postrequest(self, successful: bool):
"""Hook to be called after request finished"""
reqtime = time.perf_counter() - self.reqtime_start
self.check_time_add(reqtime)
if successful:
self.master_server_vote_on_time(reqtime)
else:
self.master_server_downvote()

View File

@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
from . import tclresult
class TclRequest:
def __init__(self):
self.uri = ""
self.rawmode = False
self.response = None
self.result = None
self.error = None
self.success = False
def get_headers(self):
return {}
def get_params(self):
return {}
def is_done(self, http_status: int, contents: str):
"""Checks if query is done or needs retry."""
return False
def get_result(self) -> tclresult.TclResult:
"""Returns Result object."""
return self.result

View File

@ -0,0 +1,68 @@
# -*- coding: utf-8 -*-
import xml.dom.minidom
from defusedxml import ElementTree
from .. import dumpmgr
class TclResult:
def __init__(self, xml: str):
self.raw_xml = xml
self.dumper = dumpmgr.DumpMgr()
self.dumper.write_dump(xml)
def delete_dump(self):
self.dumper.delete_last_dump()
def pretty_xml(self):
"""Return prettified input XML with ``xml.dom.minidom``."""
mdx = xml.dom.minidom.parseString(self.raw_xml)
return mdx.toprettyxml(indent=" ")
class CheckResult(TclResult):
def __init__(self, xml: str):
super().__init__(xml)
root = ElementTree.fromstring(xml)
self.curef = root.find("CUREF").text
self.fvver = root.find("VERSION").find("FV").text
self.tvver = root.find("VERSION").find("TV").text
self.fw_id = root.find("FIRMWARE").find("FW_ID").text
fileinfo = root.find("FIRMWARE").find("FILESET").find("FILE")
self.fileid = fileinfo.find("FILE_ID").text
self.filename = fileinfo.find("FILENAME").text
self.filesize = fileinfo.find("SIZE").text
self.filehash = fileinfo.find("CHECKSUM").text
class DownloadResult(TclResult):
def __init__(self, xml: str):
super().__init__(xml)
root = ElementTree.fromstring(xml)
file = root.find("FILE_LIST").find("FILE")
self.fileid = file.find("FILE_ID").text
self.fileurl = file.find("DOWNLOAD_URL").text
s3_fileurl_node = file.find("S3_DOWNLOAD_URL")
self.s3_fileurl = None
if s3_fileurl_node is not None:
self.s3_fileurl = s3_fileurl_node.text
slave_list = root.find("SLAVE_LIST").findall("SLAVE")
enc_list = root.find("SLAVE_LIST").findall("ENCRYPT_SLAVE")
s3_slave_list = root.find("SLAVE_LIST").findall("S3_SLAVE")
self.slaves = [s.text for s in slave_list]
self.encslaves = [s.text for s in enc_list]
self.s3_slaves = [s.text for s in s3_slave_list]
class ChecksumResult(TclResult):
def __init__(self, xml: str):
super().__init__(xml)
root = ElementTree.fromstring(xml)
file = root.find("FILE_CHECKSUM_LIST").find("FILE")
self.file_addr = file.find("ADDRESS").text
self.sha1_enc_footer = file.find("ENCRYPT_FOOTER").text
self.sha1_footer = file.find("FOOTER").text
self.sha1_body = file.find("BODY").text
class EncryptHeaderResult(TclResult):
def __init__(self, contents: str):
self.rawdata = contents

View File

@ -1,66 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0326,C0103
"""Tools to sort API servers to find the least awful one."""
import numpy
class ServerVoteMixin:
"""A mixin component for server sorting."""
def __init__(self):
"""Populate server list and weighting variables."""
self.g2master = None
self.master_servers = [
"g2master-us-east.tclclouds.com",
"g2master-us-west.tclclouds.com",
"g2master-eu-west.tclclouds.com",
"g2master-ap-south.tclclouds.com",
"g2master-ap-north.tclclouds.com",
"g2master-sa-east.tclclouds.com",
]
self.master_servers_weights = [3] * len(self.master_servers)
self.check_time_sum = 3
self.check_time_count = 1
def get_master_server(self):
"""Return weighted choice from server list."""
weight_sum = 0
for i in self.master_servers_weights:
weight_sum += i
numpy_weights = []
for i in self.master_servers_weights:
numpy_weights.append(i/weight_sum)
return numpy.random.choice(self.master_servers, p=numpy_weights)
def master_server_downvote(self):
"""Decrease weight of a server."""
idx = self.master_servers.index(self.g2master)
if self.master_servers_weights[idx] > 1:
self.master_servers_weights[idx] -= 1
def master_server_upvote(self):
"""Increase weight of a server."""
idx = self.master_servers.index(self.g2master)
if self.master_servers_weights[idx] < 10:
self.master_servers_weights[idx] += 1
def check_time_add(self, duration):
"""Record connection time."""
self.check_time_sum += duration
self.check_time_count += 1
def check_time_avg(self):
"""Return average connection time."""
return self.check_time_sum / self.check_time_count
def master_server_vote_on_time(self, last_duration):
"""Change weight of a server based on average connection time."""
avg_duration = self.check_time_avg()
if last_duration < avg_duration - 0.5:
self.master_server_upvote()
elif last_duration > avg_duration + 0.5:
self.master_server_downvote()

View File

@ -1,94 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0326,C0103
"""Tools to interface with TCL's update request API."""
import time
from collections import OrderedDict, defaultdict
import requests
from defusedxml import ElementTree
from .devices import Device
class TclCheckMixin:
"""A mixin component for TCL's update request API."""
def prep_check_url(self, https=True):
"""Prepare URL for update request."""
protocol = "https://" if https else "http://"
url = protocol + self.g2master + "/check.php"
return url
def prep_check(self, device: Device, https=True):
"""Prepare URL and parameters for update request."""
url = self.prep_check_url(https)
params = OrderedDict()
params["id"] = device.imei
params["curef"] = device.curef
params["fv"] = device.fwver
params["mode"] = device.mode
params["type"] = device.type
params["cltp"] = device.cltp
params["cktp"] = device.cktp
params["rtd"] = device.rtd
params["chnl"] = device.chnl
#params["osvs"] = device.osvs
#params["ckot"] = device.ckot
return url, params
def do_check(self, device: Device, https=True, timeout=10, max_tries=5):
"""Perform update request with given parameters."""
url, params = self.prep_check(device, https)
last_response = None
for _ in range(0, max_tries):
try:
reqtime_start = time.perf_counter()
req = self.sess.get(url, params=params, timeout=timeout)
reqtime = time.perf_counter() - reqtime_start
self.check_time_add(reqtime)
last_response = req
if req.status_code == 200:
self.master_server_vote_on_time(reqtime)
req.encoding = "utf-8" # Force encoding as server doesn't give one
self.write_dump(req.text)
return req.text
elif req.status_code not in [500, 502, 503]:
self.do_check_errorhandle(req, reqtime)
except requests.exceptions.Timeout:
pass
# Something went wrong, try a different server
self.master_server_downvote()
self.g2master = self.get_master_server()
url = self.prep_check_url(https)
raise requests.exceptions.RetryError("Max tries ({}) reached.".format(max_tries), response=last_response)
def do_check_errorhandle(self, req, reqtime):
"""Handle non-HTTP 200 results for ``do_check``."""
errcodes = defaultdict(lambda: "HTTP {}.".format(req.status_code))
errcodes[204] = "No update available."
errcodes[404] = "No data for requested CUREF/FV combination."
if req.status_code in [204, 404]:
self.master_server_vote_on_time(reqtime)
elif req.status_code not in [500, 502, 503]:
self.master_server_downvote()
req.raise_for_status()
raise requests.exceptions.HTTPError(errcodes[req.status_code], response=req)
@staticmethod
def parse_check(xmlstr):
"""Parse output of ``do_check``."""
root = ElementTree.fromstring(xmlstr)
curef = root.find("CUREF").text
fvver = root.find("VERSION").find("FV").text
tvver = root.find("VERSION").find("TV").text
fw_id = root.find("FIRMWARE").find("FW_ID").text
fileinfo = root.find("FIRMWARE").find("FILESET").find("FILE")
fileid = fileinfo.find("FILE_ID").text
filename = fileinfo.find("FILENAME").text
filesize = fileinfo.find("SIZE").text
filehash = fileinfo.find("CHECKSUM").text
return curef, fvver, tvver, fw_id, fileid, filename, filesize, filehash

View File

@ -1,56 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0326,C0103
"""Tools to interface with TCL's checksum API."""
import json
from defusedxml import ElementTree
from . import credentials
class TclChecksumMixin:
"""A mixin component for TCL's checksum API."""
@staticmethod
def prep_checksum(encslave, address, uri):
"""Prepare URL and parameters for checksum request."""
url = "http://" + encslave + "/checksum.php"
params = credentials.get_creds2()
payload = {address: uri}
payload_json = json.dumps(payload)
params[b"address"] = bytes(payload_json, "utf-8")
return url, params
def do_checksum(self, encslave, address, uri):
"""Perform checksum request with given parameters."""
url, params = self.prep_checksum(encslave, address, uri)
# print(repr(dict(params)))
req = self.sess.post(url, data=params)
if req.status_code == 200:
req.encoding = "utf-8" # Force encoding as server doesn't give one
self.write_dump(req.text)
# <ENCRYPT_FOOTER>2abfa6f6507044fec995efede5d818e62a0b19b5</ENCRYPT_FOOTER> means ERROR (invalid ADDRESS!)
if "<ENCRYPT_FOOTER>2abfa6f6507044fec995efede5d818e62a0b19b5</ENCRYPT_FOOTER>" in req.text:
print("INVALID URI: {}".format(uri))
raise SystemExit
return req.text
else:
print("CHECKSUM: " + repr(req))
print(repr(req.headers))
print(repr(req.text))
raise SystemExit
@staticmethod
def parse_checksum(xmlstr):
"""Parse output of ``do_checksum``."""
root = ElementTree.fromstring(xmlstr)
file = root.find("FILE_CHECKSUM_LIST").find("FILE")
file_addr = file.find("ADDRESS").text
sha1_enc_footer = file.find("ENCRYPT_FOOTER").text
sha1_footer = file.find("FOOTER").text
sha1_body = file.find("BODY").text
return file_addr, sha1_body, sha1_enc_footer, sha1_footer

View File

@ -1,27 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0326,C0103
"""Tools to interface with TCL's encrypted header API."""
from . import credentials
class TclEncHeaderMixin:
"""A mixin component for TCL's encrypted header API.."""
def do_encrypt_header(self, encslave, address):
"""Perform encrypted header request with given parameters."""
params = credentials.get_creds2()
params[b"address"] = bytes(address, "utf-8")
url = "http://" + encslave + "/encrypt_header.php"
req = self.sess.post(url, data=params, verify=False)
# Expect "HTTP 206 Partial Content" response
if req.status_code == 206:
return req.content
else:
print("ENCRYPT: " + repr(req))
print(repr(req.headers))
print(repr(req.text))
raise SystemExit

View File

@ -1,123 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0326,C0103
"""Tools to interface with TCL's download request API."""
import binascii
import hashlib
import random
import time
import zlib
from collections import OrderedDict
from math import floor
from defusedxml import ElementTree
'''
private HashMap<String, String> buildDownloadUrisParams(UpdatePackageInfo updatePackageInfo) {
FotaLog.m28v(TAG, "doAfterCheck");
String salt = FotaUtil.salt();
HashMap linkedHashMap = new LinkedHashMap();
linkedHashMap.put("id", this.internalBuilder.getParam("id"));
linkedHashMap.put("salt", salt);
linkedHashMap.put("curef", updatePackageInfo.mCuref);
linkedHashMap.put("fv", updatePackageInfo.mFv);
linkedHashMap.put("tv", updatePackageInfo.mTv);
linkedHashMap.put("type", "Firmware");
linkedHashMap.put("fw_id", updatePackageInfo.mFirmwareId);
linkedHashMap.put("mode", "2");
linkedHashMap.put("vk", generateVk2((LinkedHashMap) linkedHashMap.clone()));
linkedHashMap.put("cltp", "10");
linkedHashMap.put("cktp", this.internalBuilder.getParam("cktp"));
linkedHashMap.put("rtd", this.internalBuilder.getParam("rtd"));
linkedHashMap.put("chnl", this.internalBuilder.getParam("chnl"));
return linkedHashMap;
}
'''
VDKEY_B64Z = b"eJwdjwEOwDAIAr8kKFr//7HhmqXp8AIIDrYAgg8byiUXrwRJRXja+d6iNxu0AhUooDCN9rd6rDLxmGIakUVWo3IGCTRWqCAt6X4jGEIUAxgN0eYWnp+LkpHQAg/PsO90ELsy0Npm/n2HbtPndFgGEV31R9OmT4O4nrddjc3Qt6nWscx7e+WRHq5UnOudtjw5skuV09pFhvmqnOEIs4ljPeel1wfLYUF4\n"
def get_salt():
"""Generate cryptographic salt."""
millis = floor(time.time() * 1000)
tail = "{:06d}".format(random.randint(0, 999999))
return "{}{}".format(str(millis), tail)
def get_vk2(params_dict, cltp):
"""Generate salted hash of API parameters."""
params_dict["cltp"] = cltp
query = ""
for key, val in params_dict.items():
if query:
query += "&"
query += key + "=" + str(val)
vdk = zlib.decompress(binascii.a2b_base64(VDKEY_B64Z))
query += vdk.decode("utf-8")
engine = hashlib.sha1()
engine.update(bytes(query, "utf-8"))
hexhash = engine.hexdigest()
return hexhash
class TclRequestMixin:
"""A mixin component for TCL's download request API."""
def prep_request(self, curef, fvver, tvver, fw_id):
"""Prepare URL and device parameters for download request."""
url = "https://" + self.g2master + "/download_request.php"
params = OrderedDict()
params["id"] = self.serid
params["salt"] = get_salt()
params["curef"] = curef
params["fv"] = fvver
params["tv"] = tvver
params["type"] = self.ftype
params["fw_id"] = fw_id
params["mode"] = self.mode.value
params["vk"] = get_vk2(params, self.cltp.value)
params["cltp"] = self.cltp.value
params["cktp"] = self.cktp.value
params["rtd"] = self.rtd.value
if self.mode == self.MODE.FULL:
params["foot"] = 1
params["chnl"] = self.chnl.value
return url, params
def do_request(self, curef, fvver, tvver, fw_id):
"""Perform download request with given parameters."""
url, params = self.prep_request(curef, fvver, tvver, fw_id)
# print(repr(dict(params)))
req = self.sess.post(url, data=params)
if req.status_code == 200:
req.encoding = "utf-8" # Force encoding as server doesn't give one
self.write_dump(req.text)
return req.text
else:
print("REQUEST: " + repr(req))
print(repr(req.headers))
print(repr(req.text))
raise SystemExit
@staticmethod
def parse_request(xmlstr):
"""Parse output of ``do_request``."""
root = ElementTree.fromstring(xmlstr)
file = root.find("FILE_LIST").find("FILE")
fileid = file.find("FILE_ID").text
fileurl = file.find("DOWNLOAD_URL").text
s3_fileurl_node = file.find("S3_DOWNLOAD_URL")
s3_fileurl = ""
if s3_fileurl_node:
s3_fileurl = s3_fileurl_node.text
slave_list = root.find("SLAVE_LIST").findall("SLAVE")
enc_list = root.find("SLAVE_LIST").findall("ENCRYPT_SLAVE")
s3_slave_list = root.find("SLAVE_LIST").findall("S3_SLAVE")
slaves = [s.text for s in slave_list]
encslaves = [s.text for s in enc_list]
s3_slaves = [s.text for s in s3_slave_list]
return fileid, fileurl, slaves, encslaves, s3_fileurl, s3_slaves

View File

@ -1,14 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0326,C0103
"""XML tools."""
import xml.dom.minidom
def pretty_xml(xmlstr):
"""Prettify input XML with ``xml.dom.minidom``."""
mdx = xml.dom.minidom.parseString(xmlstr)
return mdx.toprettyxml(indent=" ")