diff --git a/tclcheck.py b/tclcheck.py index 88a273c..5405049 100755 --- a/tclcheck.py +++ b/tclcheck.py @@ -9,19 +9,18 @@ import os import random import sys -import tcllib -import tcllib.argparser +from tcllib import argparser from tcllib.devices import Device -from tcllib.xmltools import pretty_xml +from tcllib.dumpmgr import write_info_if_dumps_found +from tcllib.requests import RequestRunner, CheckRequest, DownloadRequest, \ + ChecksumRequest, EncryptHeaderRequest, ServerSelector -fc = tcllib.FotaCheck() - dpdesc = """ Checks for the latest FULL updates for the specified PRD number or for an OTA from the version specified as fvver. """ -dp = tcllib.argparser.DefaultParser(__file__, dpdesc) +dp = argparser.DefaultParser(__file__, dpdesc) dp.add_argument("prd", nargs=1, help="CU Reference #, e.g. PRD-63117-011") dp.add_argument("fvver", nargs="?", help="Firmware version to check for OTA updates, e.g. AAM481 (omit to run FULL check)", default="AAA000") dp.add_argument("-i", "--imei", help="use specified IMEI instead of default", type=str) @@ -68,38 +67,59 @@ else: print("Mode: {}".format(dev.mode)) print("CLTP: {}".format(dev.cltp)) -fc.reset_session(dev) -check_xml = fc.do_check(dev) -print(pretty_xml(check_xml)) -curef, fv, tv, fw_id, fileid, fn, fsize, fhash = fc.parse_check(check_xml) +runner = RequestRunner(ServerSelector()) -req_xml = fc.do_request(curef, fv, tv, fw_id) -print(pretty_xml(req_xml)) -fileid, fileurl, slaves, encslaves, s3_fileurl, s3_slaves = fc.parse_request(req_xml) +# Check for update +chk = CheckRequest(dev) +runner.run(chk) +if not chk.success: + print("{}".format(chk.error)) + sys.exit(2) +chkres = chk.get_result() +print(chkres.pretty_xml()) -if encslaves: - chksum_xml = fc.do_checksum(random.choice(encslaves), fileurl, fileurl) - print(pretty_xml(chksum_xml)) - file_addr, sha1_body, sha1_enc_footer, sha1_footer = fc.parse_checksum(chksum_xml) +# Request download +dlr = DownloadRequest(dev, chkres.tvver, chkres.fw_id) +runner.run(dlr) +if not dlr.success: + print("{}".format(dlr.error)) + sys.exit(3) +dlrres = dlr.get_result() +print(dlrres.pretty_xml()) -for s in slaves: - print("http://{}{}".format(s, fileurl)) +if dlrres.encslaves: + encrunner = RequestRunner(ServerSelector(dlrres.encslaves), https=False) + cks = ChecksumRequest(dlrres.fileurl, dlrres.fileurl) + encrunner.run(cks) + if not cks.success: + print("{}".format(cks.error)) + sys.exit(4) + cksres = cks.get_result() + print(cksres.pretty_xml()) -for s in s3_slaves: - print("http://{}{}".format(s, s3_fileurl)) +for s in dlrres.slaves: + print("http://{}{}".format(s, dlrres.fileurl)) + +for s in dlrres.s3_slaves: + print("http://{}{}".format(s, dlrres.s3_fileurl)) if dev.mode == dev.MODE_STATES["FULL"]: - header = fc.do_encrypt_header(random.choice(encslaves), fileurl) - headname = "header_{}.bin".format(tv) + hdr = EncryptHeaderRequest(dlrres.fileurl) + encrunner.run(hdr) + if not hdr.success: + print("{}".format(hdr.error)) + sys.exit(5) + hdrres = hdr.get_result() + headname = "header_{}.bin".format(chkres.tvver) headdir = "headers" if not os.path.exists(headdir): os.makedirs(headdir) - if len(header) == 4194320: + if len(hdrres.rawdata) == 4194320: # TODO: Check sha1sum print("Header length check passed. Writing to {}.".format(headname)) with open(os.path.join(headdir, headname), "wb") as f: - f.write(header) + f.write(hdrres.rawdata) else: - print("Header length invalid ({}).".format(len(header))) + print("Header length invalid ({}).".format(len(hdrres.rawdata))) -tcllib.FotaCheck.write_info_if_dumps_found() +write_info_if_dumps_found() diff --git a/tclcheck_allfull.py b/tclcheck_allfull.py index 679dd2a..1b76ae0 100644 --- a/tclcheck_allfull.py +++ b/tclcheck_allfull.py @@ -7,22 +7,19 @@ import sys -from requests.exceptions import RequestException - -import tcllib -import tcllib.argparser -from tcllib import ansi, devlist +from tcllib import ansi, argparser, devlist from tcllib.devices import DesktopDevice +from tcllib.dumpmgr import write_info_if_dumps_found +from tcllib.requests import RequestRunner, CheckRequest, ServerVoteSelector dev = DesktopDevice() -fc = tcllib.FotaCheck() dpdesc = """ Checks for the latest FULL updates for all PRD numbers or only for the PRD specified as prd. """ -dp = tcllib.argparser.DefaultParser(__file__, dpdesc) +dp = argparser.DefaultParser(__file__, dpdesc) dp.add_argument("-p", "--prd", help="CU Reference # to filter scan results", dest="tocheck", nargs="?", default=None, metavar="PRD") dp.add_argument("-l", "--local", help="Force using local database", dest="local", action="store_true", default=False) args = dp.parse_args(sys.argv[1:]) @@ -34,27 +31,30 @@ prds = devlist.get_devicelist(local=args.local) print("List of latest FULL firmware by PRD:") +runner = RequestRunner(ServerVoteSelector()) +runner.max_tries = 20 + for prd, variant in prds.items(): model = variant["variant"] lastver = variant["last_full"] - if prdcheck in prd: - try: - dev.curef = prd - fc.reset_session(dev) - check_xml = fc.do_check(dev, max_tries=20) - curef, fv, tv, fw_id, fileid, fn, fsize, fhash = fc.parse_check(check_xml) - txt_tv = tv - if tv != lastver: - txt_tv = "{} (old: {} / OTA: {})".format( - ansi.CYAN + txt_tv + ansi.RESET, - ansi.CYAN_DARK + variant["last_full"] + ansi.RESET, - variant["last_ota"] - ) - else: - fc.delete_last_dump() - print("{}: {} {} ({})".format(prd, txt_tv, fhash, model)) - except RequestException as e: - print("{}: {}".format(prd, str(e))) - continue + if not prdcheck in prd: + continue + dev.curef = prd + chk = CheckRequest(dev) + runner.run(chk) + if chk.success: + result = chk.get_result() + txt_tv = result.tvver + if result.tvver != lastver: + txt_tv = "{} (old: {} / OTA: {})".format( + ansi.CYAN + txt_tv + ansi.RESET, + ansi.CYAN_DARK + variant["last_full"] + ansi.RESET, + variant["last_ota"] + ) + else: + result.delete_dump() + print("{}: {} {} ({})".format(prd, txt_tv, result.filehash, model)) + else: + print("{}: {}".format(prd, str(chk.error))) -tcllib.FotaCheck.write_info_if_dumps_found() +write_info_if_dumps_found() diff --git a/tclcheck_allota.py b/tclcheck_allota.py index b406cef..044726a 100644 --- a/tclcheck_allota.py +++ b/tclcheck_allota.py @@ -7,22 +7,19 @@ import sys -from requests.exceptions import RequestException - -import tcllib -import tcllib.argparser -from tcllib import ansi, devlist +from tcllib import ansi, argparser, devlist from tcllib.devices import MobileDevice +from tcllib.dumpmgr import write_info_if_dumps_found +from tcllib.requests import RequestRunner, CheckRequest, ServerVoteSelector dev = MobileDevice() -fc = tcllib.FotaCheck() dpdesc = """ Checks for the latest OTA updates for all PRD numbers or only for the PRD specified as prd. Initial software version can be specified with forcever. """ -dp = tcllib.argparser.DefaultParser(__file__, dpdesc) +dp = argparser.DefaultParser(__file__, dpdesc) dp.add_argument("forcever", help="Initial software version to check for OTA updates, e.g. AAM481", nargs="?", default=None) dp.add_argument("-p", "--prd", help="CU Reference # to filter scan results", dest="tocheck", nargs="?", default=None, metavar="PRD") dp.add_argument("-l", "--local", help="Force using local database", dest="local", action="store_true", default=False) @@ -40,23 +37,26 @@ prds = devlist.get_devicelist(local=args.local) print("List of latest OTA firmware{} by PRD:".format(force_ver_text)) +runner = RequestRunner(ServerVoteSelector()) +runner.max_tries = 20 + for prd, variant in prds.items(): model = variant["variant"] lastver = variant["last_ota"] lastver = variant["last_full"] if lastver is None else lastver if args.forcever is not None: lastver = args.forcever - if prdcheck in prd: - try: - dev.curef = prd - dev.fwver = lastver - fc.reset_session(dev) - check_xml = fc.do_check(dev, max_tries=20) - curef, fv, tv, fw_id, fileid, fn, fsize, fhash = fc.parse_check(check_xml) - versioninfo = ansi.YELLOW_DARK + fv + ansi.RESET + " ⇨ " + ansi.YELLOW + tv + ansi.RESET + " (FULL: {})".format(variant["last_full"]) - print("{}: {} {} ({})".format(prd, versioninfo, fhash, model)) - except RequestException as e: - print("{} ({}): {}".format(prd, lastver, str(e))) - continue + if not prdcheck in prd: + continue + dev.curef = prd + dev.fwver = lastver + chk = CheckRequest(dev) + runner.run(chk) + if chk.success: + result = chk.get_result() + versioninfo = ansi.YELLOW_DARK + result.fvver + ansi.RESET + " ⇨ " + ansi.YELLOW + result.tvver + ansi.RESET + " (FULL: {})".format(variant["last_full"]) + print("{}: {} {} ({})".format(prd, versioninfo, result.filehash, model)) + else: + print("{} ({}): {}".format(prd, lastver, chk.error)) -tcllib.FotaCheck.write_info_if_dumps_found() +write_info_if_dumps_found() diff --git a/tclcheck_findprd.py b/tclcheck_findprd.py index 46950e4..3d8a168 100755 --- a/tclcheck_findprd.py +++ b/tclcheck_findprd.py @@ -8,22 +8,17 @@ import collections import sys -from requests.exceptions import RequestException, Timeout +from tcllib import ansi, argparser, devlist +from tcllib.devices import DesktopDevice +from tcllib.dumpmgr import write_info_if_dumps_found +from tcllib.requests import RequestRunner, CheckRequest, ServerVoteSelector -import tcllib -import tcllib.argparser -from tcllib import ansi, devlist -from tcllib.devices import DesktopDevice, MobileDevice - - -dev = DesktopDevice() -fc = tcllib.FotaCheck() dpdesc = """ Finds new PRD numbers for all known variants, or specified variants with tocheck. Scan range can be set by floor and ceiling switches. """ -dp = tcllib.argparser.DefaultParser(__file__, dpdesc) +dp = argparser.DefaultParser(__file__, dpdesc) dp.add_argument("tocheck", help="CU Reference # to filter scan results", nargs="?", default=None) dp.add_argument("-f", "--floor", help="Beginning of scan range", dest="floor", nargs="?", type=int, default=0) dp.add_argument("-c", "--ceiling", help="End of scan range", dest="ceiling", nargs="?", type=int, default=999) @@ -58,6 +53,11 @@ if args.tocheck is not None: if not prddict: prddict[args.tocheck] = [] +dev = DesktopDevice() + +runner = RequestRunner(ServerVoteSelector(), https=False) +runner.max_tries = 20 + for center in sorted(prddict.keys()): tails = [int(i) for i in prddict[center]] safes = [g for g in range(floor, ceiling) if g not in tails] @@ -69,15 +69,13 @@ for center in sorted(prddict.keys()): done_count += 1 print("Checking {} ({}/{})".format(curef, done_count, total_count)) print(ansi.UP_DEL, end="") - try: - dev.curef = curef - fc.reset_session(dev) - check_xml = fc.do_check(dev, https=False, max_tries=20) - curef, fv, tv, fw_id, fileid, fn, fsize, fhash = fc.parse_check(check_xml) - txt_tv = tv - print("{}: {} {}".format(curef, txt_tv, fhash)) - except (SystemExit, RequestException, Timeout) as e: - continue + dev.curef = curef + chk = CheckRequest(dev) + runner.run(chk) + if chk.success: + chkres = chk.get_result() + txt_tv = chkres.tvver + print("{}: {} {}".format(curef, txt_tv, chkres.filehash)) print("Scan complete.") -tcllib.FotaCheck.write_info_if_dumps_found() +write_info_if_dumps_found() diff --git a/tclcheck_findprd2.py b/tclcheck_findprd2.py index 194d0ec..a166ef9 100644 --- a/tclcheck_findprd2.py +++ b/tclcheck_findprd2.py @@ -7,25 +7,20 @@ import sys -from requests.exceptions import RequestException, Timeout - -import tcllib -import tcllib.argparser -from tcllib import ansi, devlist -from tcllib.devices import DesktopDevice, MobileDevice +from tcllib import ansi, argparser, devlist +from tcllib.devices import DesktopDevice +from tcllib.dumpmgr import write_info_if_dumps_found +from tcllib.requests import RequestRunner, CheckRequest, ServerVoteSelector # Variants to scan for SCAN_VARIANTS = ["001", "003", "009", "010", "700"] -dev = DesktopDevice() -fc = tcllib.FotaCheck() - dpdesc = """ Finds new PRD numbers for a range of variants. Scan range can be set by floor and ceiling switches. """ -dp = tcllib.argparser.DefaultParser(__file__, dpdesc) +dp = argparser.DefaultParser(__file__, dpdesc) dp.add_argument("floor", nargs="?", help="Model number to start with", type=int, default=63116) dp.add_argument("ceiling", nargs="?", help="Model number to end with", type=int, default=99999) dp.add_argument("-l", "--local", help="Force using local database", dest="local", action="store_true", default=False) @@ -50,21 +45,24 @@ to_scan = scan_list - known_centers total_count = len(to_scan) * len(SCAN_VARIANTS) done_count = 0 +dev = DesktopDevice() + +runner = RequestRunner(ServerVoteSelector(), https=False) +runner.max_tries = 20 + for center in to_scan: for j in SCAN_VARIANTS: curef = "PRD-{:05}-{:3}".format(center, j) done_count += 1 print("Checking {} ({}/{})".format(curef, done_count, total_count)) print(ansi.UP_DEL, end="") - try: - dev.curef = curef - fc.reset_session(dev) - check_xml = fc.do_check(dev, https=False, max_tries=20) - curef, fv, tv, fw_id, fileid, fn, fsize, fhash = fc.parse_check(check_xml) - txt_tv = tv - print("{}: {} {}".format(curef, txt_tv, fhash)) - except (SystemExit, RequestException, Timeout) as e: - continue + dev.curef = curef + chk = CheckRequest(dev) + runner.run(chk) + if chk.success: + chkres = chk.get_result() + txt_tv = chkres.tvver + print("{}: {} {}".format(curef, txt_tv, chkres.filehash)) print("Scan complete.") -tcllib.FotaCheck.write_info_if_dumps_found() +write_info_if_dumps_found() diff --git a/tclcheck_findver.py b/tclcheck_findver.py index 301e66f..808e9ca 100755 --- a/tclcheck_findver.py +++ b/tclcheck_findver.py @@ -7,32 +7,28 @@ import sys -from requests.exceptions import RequestException, Timeout +from tcllib import ansi, argparser, devlist +from tcllib.devices import MobileDevice +from tcllib.dumpmgr import write_info_if_dumps_found +from tcllib.requests import RequestRunner, CheckRequest, ServerVoteSelector -import tcllib -import tcllib.argparser -from tcllib import ansi -from tcllib.devices import DesktopDevice, MobileDevice - - -dev = MobileDevice() -fc = tcllib.FotaCheck() dpdesc = """ Finds all valid OTA updates for a given PRD. Scan range can be set by startver and endver switches. """ -dp = tcllib.argparser.DefaultParser(__file__, dpdesc) +dp = argparser.DefaultParser(__file__, dpdesc) dp.add_argument("prd", help="CU Reference #, e.g. PRD-63117-011") dp.add_argument("startver", help="Beginning of scan range", nargs="?", default="AAA000") dp.add_argument("endver", help="End of scan range", nargs="?", default="AAZ999") args = dp.parse_args(sys.argv[1:]) -fc.curef = args.prd +dev = MobileDevice() +dev.curef = args.prd start_ver = args.startver end_ver = args.endver -print("Valid firmwares for model {} (between {} and {}):".format(fc.curef, start_ver, end_ver)) +print("Valid firmwares for model {} (between {} and {}):".format(dev.curef, start_ver, end_ver)) cur_ver = start_ver allvers = [] @@ -53,21 +49,22 @@ while True: break cur_ver = "{:3}{:03d}".format("".join(letters), num) +runner = RequestRunner(ServerVoteSelector(), https=False) +runner.max_tries = 20 + done_count = 0 total_count = len(allvers) for fv in allvers: done_count += 1 print("Checking {} ({}/{})".format(fv, done_count, total_count)) print(ansi.UP_DEL, end="") - try: - dev.fwver = fv - fc.reset_session(dev) - check_xml = fc.do_check(dev, https=False, max_tries=20) - curef, fv, tv, fw_id, fileid, fn, fsize, fhash = fc.parse_check(check_xml) - txt_tv = tv - print("{}: {} ⇨ {} {}".format(curef, fv, txt_tv, fhash)) - except (SystemExit, RequestException, Timeout) as e: - continue + dev.fwver = fv + chk = CheckRequest(dev) + runner.run(chk) + if chk.success: + chkres = chk.get_result() + txt_tv = chkres.tvver + print("{}: {} ⇨ {} {}".format(dev.curef, fv, txt_tv, chkres.filehash)) print("Scan complete.") -tcllib.FotaCheck.write_info_if_dumps_found() +write_info_if_dumps_found() diff --git a/tclcheck_gapfill.py b/tclcheck_gapfill.py index b695c3c..9e11604 100644 --- a/tclcheck_gapfill.py +++ b/tclcheck_gapfill.py @@ -6,20 +6,18 @@ """Query existence of missing OTAs.""" import json - import requests -from requests.exceptions import RequestException -import tcllib -from tcllib.devices import DesktopDevice, MobileDevice +from tcllib import ansi +from tcllib.devices import MobileDevice +from tcllib.dumpmgr import write_info_if_dumps_found +from tcllib.requests import RequestRunner, CheckRequest, ServerVoteSelector # 1. Fetch list of missing OTAs (e.g. from ancient versions to current) # 2. Query updates from FOTA servers (and store XML) # (3. Upload will be done manually with upload_logs.py) -dev = MobileDevice() -fc = tcllib.FotaCheck() print("Loading list of missing OTAs.") versions_json = requests.get("https://tclota.birth-online.de/json_otaversions.php").text @@ -30,22 +28,25 @@ for i in versions: print("Got {} devices and a total of {} missing OTAs.".format(len(versions), num_versions)) +dev = MobileDevice() + +runner = RequestRunner(ServerVoteSelector()) +runner.max_tries = 20 + num_item = 1 for prd, data in versions.items(): print("{}:".format(prd), end="", flush=True) for ver in data["missing_froms"]: print(" {}".format(ver), end="", flush=True) - try: - dev.curef = prd - dev.fwver = ver - fc.reset_session(dev) - check_xml = fc.do_check(dev, max_tries=20) - curef, fv, tv, fw_id, fileid, fn, fsize, fhash = fc.parse_check(check_xml) + dev.curef = prd + dev.fwver = ver + chk = CheckRequest(dev) + runner.run(chk) + if chk.success: print("✔", end="", flush=True) - except RequestException as e: + num_item += 1 + else: print("✖", end="", flush=True) - continue - num_item += 1 print("") -tcllib.FotaCheck.write_info_if_dumps_found() +write_info_if_dumps_found() diff --git a/tclchksum.py b/tclchksum.py index f9c7557..18a3d0c 100644 --- a/tclchksum.py +++ b/tclchksum.py @@ -5,14 +5,10 @@ """Return checksum for given firmware.""" -import random import sys -import tcllib -import tcllib.argparser -from tcllib.xmltools import pretty_xml - -fc = tcllib.FotaCheck() +from tcllib import argparser +from tcllib.requests import RequestRunner, ChecksumRequest, ServerSelector encslaves = [ "54.238.56.196", @@ -29,7 +25,7 @@ encslaves = [ dpdesc = """ Returns the checksum for a given firmware URI. """ -dp = tcllib.argparser.DefaultParser(__file__, dpdesc) +dp = argparser.DefaultParser(__file__, dpdesc) dp.add_argument("uri", help="URI to firmware, starts with '/body/...'") args = dp.parse_args(sys.argv[1:]) @@ -38,6 +34,13 @@ fileurl = args.uri # /body/ce570ddc079e2744558f191895e524d02a60476f/32/268932 #fileurl = "/body/ce570ddc079e2744558f191895e524d02a60476f/2c23717bb747f3c321195419f451de52efa8ea51/263790/268932" -chksum_xml = fc.do_checksum(random.choice(encslaves), fileurl, fileurl) -print(pretty_xml(chksum_xml)) -file_addr, sha1_body, sha1_enc_footer, sha1_footer = fc.parse_checksum(chksum_xml) +runner = RequestRunner(ServerSelector(encslaves), https=False) + +cks = ChecksumRequest(fileurl, fileurl) +runner.run(cks) + +if not cks.success: + print("{}".format(cks.error)) + sys.exit(4) +cksres = cks.get_result() +print(cksres.pretty_xml()) diff --git a/tcldown.py b/tcldown.py index 29a46cc..73fb3dd 100644 --- a/tcldown.py +++ b/tcldown.py @@ -6,22 +6,19 @@ """Download a given firmware file.""" import os -import random import sys -import tcllib -import tcllib.argparser +from tcllib import argparser from tcllib.devices import DesktopDevice -from tcllib.xmltools import pretty_xml +from tcllib.dumpmgr import write_info_if_dumps_found +from tcllib.requests import RequestRunner, CheckRequest, DownloadRequest, \ + ChecksumRequest, EncryptHeaderRequest, ServerSelector -fc = tcllib.FotaCheck() -dev = DesktopDevice() - dpdesc = """ Downloads the given firmware file. """ -dp = tcllib.argparser.DefaultParser(__file__, dpdesc) +dp = argparser.DefaultParser(__file__, dpdesc) dp.add_argument("prd", nargs=1, help="CU Reference #, e.g. PRD-63117-011") dp.add_argument("targetversion", nargs=1, help="Firmware version to download, e.g. AAN990") dp.add_argument("fwid", nargs=1, help="Numeric firmware file id, e.g. 268932") @@ -32,6 +29,7 @@ dp.add_argument("--rawmode", help="override --mode with raw value (2=OTA, 4=FULL dp.add_argument("--rawcltp", help="override --type with raw value (10=MOBILE, 2010=DESKTOP)", metavar="CLTP") args = dp.parse_args(sys.argv[1:]) +dev = DesktopDevice() def sel_mode(defaultmode, rawval): """Handle custom mode.""" @@ -65,30 +63,42 @@ dev.cltp = sel_cltp(args.type, args.rawcltp) print("Mode: {}".format(dev.mode)) print("CLTP: {}".format(dev.cltp)) -fv = dev.fwver +runner = RequestRunner(ServerSelector()) +runner.max_tries = 20 + tv = args.targetversion[0] fw_id = args.fwid[0] -req_xml = fc.do_request(dev.curef, fv, tv, fw_id) -print(pretty_xml(req_xml)) -fileid, fileurl, slaves, encslaves, s3_fileurl, s3_slaves = fc.parse_request(req_xml) +dlr = DownloadRequest(dev, tv, fw_id) +runner.run(dlr) +if not dlr.success: + print("ERROR: {}".format(dlr.error)) + sys.exit(3) -for s in slaves: - print("http://{}{}".format(s, fileurl)) +dlrres = dlr.get_result() +print(dlrres.pretty_xml()) -for s in s3_slaves: - print("http://{}{}".format(s, s3_fileurl)) +for s in dlrres.slaves: + print("http://{}{}".format(s, dlrres.fileurl)) + +for s in dlrres.s3_slaves: + print("http://{}{}".format(s, dlrres.s3_fileurl)) if dev.mode == dev.MODE_STATES["FULL"]: - header = fc.do_encrypt_header(random.choice(encslaves), fileurl) - headname = "header_{}.bin".format(tv) - headdir = "headers" - if not os.path.exists(headdir): - os.makedirs(headdir) - if len(header) == 4194320: - print("Header length check passed. Writing to {}.".format(headname)) - with open(os.path.join(headdir, headname), "wb") as f: - f.write(header) - else: - print("Header length invalid ({}).".format(len(header))) + encrun = RequestRunner(ServerSelector(dlrres.encslaves), https=False) + encrun.max_tries = 20 + hdr = EncryptHeaderRequest(dlrres.fileurl) + encrun.run(hdr) + if hdr.success: + hdrres = hdr.get_result() + headname = "header_{}.bin".format(tv) + headdir = "headers" + if not os.path.exists(headdir): + os.makedirs(headdir) + if len(hdrres.rawdata) == 4194320: + print("Header length check passed. Writing to {}.".format(headname)) + with open(os.path.join(headdir, headname), "wb") as f: + f.write(hdrres.rawdata) + else: + print("Header length invalid ({}).".format(len(hdrres.rawdata))) -tcllib.FotaCheck.write_info_if_dumps_found() +write_info_if_dumps_found() diff --git a/tcllib/__init__.py b/tcllib/__init__.py index 96f84b6..9d6eeee 100644 --- a/tcllib/__init__.py +++ b/tcllib/__init__.py @@ -1,35 +1,9 @@ -#!/usr/bin/env python3 # -*- coding: utf-8 -*- -# pylint: disable=C0111,C0326,C0103 - """Library for TCL API work and related functions.""" -import requests - -from . import (dumpmgr, servervote, tclcheck, tclchecksum, tclencheader, - tclrequest) - - -class FotaCheck( - tclcheck.TclCheckMixin, - tclrequest.TclRequestMixin, - tclchecksum.TclChecksumMixin, - tclencheader.TclEncHeaderMixin, - servervote.ServerVoteMixin, - dumpmgr.DumpMgrMixin -): - """Main API handler class.""" - - def __init__(self): - """Handle mixins and populate variables.""" - super().__init__() - self.reset_session() - - def reset_session(self, device=None): - """Reset everything to default.""" - self.g2master = self.get_master_server() - self.sess = requests.Session() - if device: - self.sess.headers.update({"User-Agent": device.ua}) - return self.sess +from .ansi import * +from .argparser import * +from .devices import * +from .devlist import * +from .dumpmgr import * diff --git a/tcllib/dumpmgr.py b/tcllib/dumpmgr.py index a24897e..e6bf993 100644 --- a/tcllib/dumpmgr.py +++ b/tcllib/dumpmgr.py @@ -15,23 +15,32 @@ from math import floor from . import ansi -class DumpMgrMixin: - """A mixin component for XML dump management.""" +def get_timestamp_random(): + """Generate timestamp + random part to avoid collisions.""" + millis = floor(time.time() * 1000) + tail = "{:06d}".format(random.randint(0, 999999)) + return "{}_{}".format(str(millis), tail) + +def write_info_if_dumps_found(): + """Notify user to upload dumps if present.""" + # To disable this info, uncomment the following line. + # return + files = glob.glob(os.path.normpath("logs/*.xml")) + if files: + print() + print("{}There are {} logs collected in the logs/ directory.{} Please consider uploading".format(ansi.YELLOW, len(files), ansi.RESET)) + print("them to https://tclota.birth-online.de/ by running {}./upload_logs.py{}.".format(ansi.CYAN, ansi.RESET)) + +class DumpMgr: + """A class for XML dump management.""" def __init__(self): """Populate dump file name.""" self.last_dump_filename = None - @staticmethod - def get_timestamp_random(): - """Generate timestamp + random part to avoid collisions.""" - millis = floor(time.time() * 1000) - tail = "{:06d}".format(random.randint(0, 999999)) - return "{}_{}".format(str(millis), tail) - def write_dump(self, data): """Write dump to file.""" - outfile = os.path.normpath("logs/{}.xml".format(self.get_timestamp_random())) + outfile = os.path.normpath("logs/{}.xml".format(get_timestamp_random())) if not os.path.exists(os.path.dirname(outfile)): try: os.makedirs(os.path.dirname(outfile)) @@ -47,14 +56,3 @@ class DumpMgrMixin: if self.last_dump_filename: os.unlink(self.last_dump_filename) self.last_dump_filename = None - - @staticmethod - def write_info_if_dumps_found(): - """Notify user to upload dumps if present.""" - # To disable this info, uncomment the following line. - # return - files = glob.glob(os.path.normpath("logs/*.xml")) - if files: - print() - print("{}There are {} logs collected in the logs/ directory.{} Please consider uploading".format(ansi.YELLOW, len(files), ansi.RESET)) - print("them to https://tclota.birth-online.de/ by running {}./upload_logs.py{}.".format(ansi.CYAN, ansi.RESET)) diff --git a/tcllib/requests/__init__.py b/tcllib/requests/__init__.py new file mode 100644 index 0000000..7872868 --- /dev/null +++ b/tcllib/requests/__init__.py @@ -0,0 +1,8 @@ +# -*- coding: utf-8 -*- + +from .checkrequest import CheckRequest +from .downloadrequest import DownloadRequest +from .checksumrequest import ChecksumRequest +from .encryptheaderrequest import EncryptHeaderRequest +from .runner import * +from .serverselector import * diff --git a/tcllib/requests/checkrequest.py b/tcllib/requests/checkrequest.py new file mode 100644 index 0000000..353b495 --- /dev/null +++ b/tcllib/requests/checkrequest.py @@ -0,0 +1,59 @@ +# -*- coding: utf-8 -*- + +from collections import OrderedDict +from .. import devices +from .tclrequest import TclRequest +from .tclresult import CheckResult + +class CheckRequest(TclRequest): + def __init__(self, device: devices.Device): + super().__init__() + self.uri = "/check.php" + self.method = "GET" + self.device = device + + def get_headers(self): + return {"User-Agent": self.device.ua} + + def get_params(self): + params = OrderedDict() + params["id"] = self.device.imei + params["curef"] = self.device.curef + params["fv"] = self.device.fwver + params["mode"] = self.device.mode + params["type"] = self.device.type + params["cltp"] = self.device.cltp + params["cktp"] = self.device.cktp + params["rtd"] = self.device.rtd + params["chnl"] = self.device.chnl + #params["osvs"] = self.device.osvs + #params["ckot"] = self.device.ckot + return params + + def is_done(self, http_status: int, contents: str) -> bool: + ok_states = { + 204: "No update available.", + 404: "No data for requested CUREF/FV combination.", + } + if http_status == 200: + self.response = contents + self.result = CheckResult(contents) + self.success = True + return True + elif http_status in ok_states: + self.error = ok_states[http_status] + self.success = False + return True + elif http_status not in [500, 502, 503]: + # Errors OTHER than 500, 502 or 503 are probably + # errors where we don't need to retry + self.error ="HTTP {}.".format(http_status) + self.success = False + return True + return False + +# Check requests have 4 possible outcomes: +# 1. HTTP 200 with XML data - our desired info +# 2. HTTP 204 - means: no newer update available +# 3. HTTP 404 - means: invalid device or firmware version +# 4. anything else: server problem (esp. 500, 502, 503) diff --git a/tcllib/requests/checksumrequest.py b/tcllib/requests/checksumrequest.py new file mode 100644 index 0000000..2ec2f89 --- /dev/null +++ b/tcllib/requests/checksumrequest.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- + +from collections import OrderedDict +import json +from .. import credentials, devices +from .tclrequest import TclRequest +from .tclresult import ChecksumResult + +class ChecksumRequest(TclRequest): + def __init__(self, address, file_uri): + super().__init__() + # NOTE: THIS HAS TO BE RUN ON AN ENCSLAVE + self.uri = "/checksum.php" + self.method = "POST" + self.address = address + self.file_uri = file_uri + + def get_headers(self): + return {"User-Agent": "tcl"} + + def get_params(self): + params = OrderedDict() + params.update(credentials.get_creds2()) + payload = {self.address: self.file_uri} + payload_json = json.dumps(payload) + params["address"] = bytes(payload_json, "utf-8") + return params + + def is_done(self, http_status: int, contents: str) -> bool: + if http_status == 200: + # 2abfa6f6507044fec995efede5d818e62a0b19b5 means ERROR (invalid ADDRESS!) + if "2abfa6f6507044fec995efede5d818e62a0b19b5" in contents: + self.error = "INVALID URI: {}".format(self.file_uri) + self.success = False + return True + self.response = contents + self.result = ChecksumResult(contents) + self.success = True + return True + self.error = "HTTP {}".format(http_status) + self.success = False + return True diff --git a/tcllib/requests/downloadrequest.py b/tcllib/requests/downloadrequest.py new file mode 100644 index 0000000..345c247 --- /dev/null +++ b/tcllib/requests/downloadrequest.py @@ -0,0 +1,82 @@ +# -*- coding: utf-8 -*- + +import binascii +import hashlib +import random +import time +import zlib +from collections import OrderedDict +from math import floor +from .. import devices +from .tclrequest import TclRequest +from .tclresult import DownloadResult + + +VDKEY_B64Z = b"eJwdjwEOwDAIAr8kKFr//7HhmqXp8AIIDrYAgg8byiUXrwRJRXja+d6iNxu0AhUooDCN9rd6rDLxmGIakUVWo3IGCTRWqCAt6X4jGEIUAxgN0eYWnp+LkpHQAg/PsO90ELsy0Npm/n2HbtPndFgGEV31R9OmT4O4nrddjc3Qt6nWscx7e+WRHq5UnOudtjw5skuV09pFhvmqnOEIs4ljPeel1wfLYUF4\n" + + +def get_salt(): + """Generate cryptographic salt.""" + millis = floor(time.time() * 1000) + tail = "{:06d}".format(random.randint(0, 999999)) + return "{}{}".format(str(millis), tail) + +def get_vk2(params_dict, cltp): + """Generate salted hash of API parameters.""" + params_dict["cltp"] = cltp + query = "" + for key, val in params_dict.items(): + if query: + query += "&" + query += key + "=" + str(val) + vdk = zlib.decompress(binascii.a2b_base64(VDKEY_B64Z)) + query += vdk.decode("utf-8") + engine = hashlib.sha1() + engine.update(bytes(query, "utf-8")) + hexhash = engine.hexdigest() + return hexhash + +class DownloadRequest(TclRequest): + def __init__(self, device: devices.Device, tvver: str, fw_id: str): + super().__init__() + self.uri = "/download_request.php" + self.method = "POST" + self.device = device + self.tvver = tvver + self.fw_id = fw_id + + def get_headers(self): + return {"User-Agent": self.device.ua} + + def get_params(self): + params = OrderedDict() + params["id"] = self.device.imei + params["salt"] = get_salt() + params["curef"] = self.device.curef + params["fv"] = self.device.fwver + params["tv"] = self.tvver + params["type"] = self.device.type + params["fw_id"] = self.fw_id + params["mode"] = self.device.mode + params["vk"] = get_vk2(params, self.device.cltp) + params["cltp"] = self.device.cltp + params["cktp"] = self.device.cktp + params["rtd"] = self.device.rtd + if self.device.mode == self.device.MODE_STATES["FULL"]: + params["foot"] = 1 + params["chnl"] = self.device.chnl + return params + + def is_done(self, http_status: int, contents: str) -> bool: + if http_status == 200: + self.response = contents + self.result = DownloadResult(contents) + self.success = True + return True + elif http_status not in [500, 502, 503]: + # Errors OTHER than 500, 502 or 503 are probably + # errors where we don't need to retry + self.error = "HTTP {}".format(http_status) + self.success = False + return True + return False diff --git a/tcllib/requests/encryptheaderrequest.py b/tcllib/requests/encryptheaderrequest.py new file mode 100644 index 0000000..e78f06a --- /dev/null +++ b/tcllib/requests/encryptheaderrequest.py @@ -0,0 +1,34 @@ +# -*- coding: utf-8 -*- + +from collections import OrderedDict +from .. import credentials, devices +from .tclrequest import TclRequest +from .tclresult import EncryptHeaderResult + +class EncryptHeaderRequest(TclRequest): + def __init__(self, file_uri): + super().__init__() + # NOTE: THIS HAS TO BE RUN ON AN ENCSLAVE + self.uri = "/encrypt_header.php" + self.rawmode = True + self.method = "POST" + self.file_uri = file_uri + + def get_headers(self): + return {"User-Agent": "tcl"} + + def get_params(self): + params = OrderedDict() + params.update(credentials.get_creds2()) + params["address"] = bytes(self.file_uri, "utf-8") + return params + + def is_done(self, http_status: int, contents: str) -> bool: + # Expect "HTTP 206 Partial Content" response + if http_status == 206: + self.result = EncryptHeaderResult(contents) + self.success = True + return True + self.error = "HTTP {}".format(http_status) + self.success = False + return True diff --git a/tcllib/requests/http.py b/tcllib/requests/http.py new file mode 100644 index 0000000..2b2821a --- /dev/null +++ b/tcllib/requests/http.py @@ -0,0 +1,38 @@ +# -*- coding: utf-8 -*- + +import requests +from collections import OrderedDict + +class TimeoutException(Exception): + pass + +class HttpRequest: + """Provides all generic features for making HTTP GET requests""" + def __init__(self, url, timeout=10): + self.url = url + self.params = OrderedDict() + self.timeout = timeout + self.headers = {} + + def reset_session(self): + """Reset everything to default.""" + self.sess = requests.Session() + self.sess.headers.update(self.headers) + + def run(self): + """Run query.""" + try: + req = self.sess.get(self.url, params=self.params, timeout=self.timeout) + except requests.exceptions.Timeout as e: + raise TimeoutException(e) + return req + +class HttpPostRequest(HttpRequest): + """Provides all generic features for making HTTP POST requests""" + def run(self): + """Run query.""" + try: + req = self.sess.post(self.url, data=self.params, timeout=self.timeout) + except requests.exceptions.Timeout as e: + raise TimeoutException(e) + return req diff --git a/tcllib/requests/runner.py b/tcllib/requests/runner.py new file mode 100644 index 0000000..5569f89 --- /dev/null +++ b/tcllib/requests/runner.py @@ -0,0 +1,51 @@ +# -*- coding: utf-8 -*- + +from .tclrequest import TclRequest +from . import http +from . import serverselector + +class UnknownMethodException(Exception): + pass + +class RequestRunner: + def __init__(self, server_selector: serverselector.ServerSelector, https=True): + self.server_selector = server_selector + self.protocol = "https://" if https else "http://" + self.max_tries = 5 + + def get_http(self, method="GET") -> http.HttpRequest: + """Returns the http class according to desired method.""" + if method == "GET": + return http.HttpRequest + elif method == "POST": + return http.HttpPostRequest + raise UnknownMethodException("Unknown http method: {}".format(method)) + + def get_server(self) -> str: + """Returns a master server.""" + return self.server_selector.get_master_server() + + def run(self, query: TclRequest, timeout: int=10) -> bool: + """Runs the actual query.""" + for _ in range(0, self.max_tries): + url = "{}{}{}".format(self.protocol, self.get_server(), query.uri) + http_handler = self.get_http(query.method)(url, timeout) + http_handler.headers = query.get_headers() + http_handler.params = query.get_params() + http_handler.reset_session() + self.server_selector.hook_prerequest() + try: + req = http_handler.run() + if query.rawmode: + done = query.is_done(req.status_code, req.content) + else: + req.encoding = "utf-8" + done = query.is_done(req.status_code, req.text) + self.server_selector.hook_postrequest(done) + if done: + return done + except http.TimeoutException: + self.server_selector.hook_postrequest(False) + query.error = "Timeout." + query.error = "Max tries ({}) reached.".format(self.max_tries) + return False diff --git a/tcllib/requests/serverselector.py b/tcllib/requests/serverselector.py new file mode 100644 index 0000000..cd2236f --- /dev/null +++ b/tcllib/requests/serverselector.py @@ -0,0 +1,109 @@ +# -*- coding: utf-8 -*- + +# pylint: disable=C0111,C0326,C0103 + +"""Tools to sort API servers to find the least awful one.""" + +import numpy +import time + + +MASTER_SERVERS = [ + "g2master-us-east.tclclouds.com", + "g2master-us-west.tclclouds.com", + "g2master-eu-west.tclclouds.com", + "g2master-ap-south.tclclouds.com", + "g2master-ap-north.tclclouds.com", + "g2master-sa-east.tclclouds.com", +] + +class ServerSelector: + """Returns a random server to use.""" + + def __init__(self, server_list=None): + """Init stuff""" + if server_list: + self.server_list = server_list + else: + self.server_list = MASTER_SERVERS + self.last_server = None + + def get_master_server(self): + """Return a random server.""" + while True: + new_server = numpy.random.choice(self.server_list) + if new_server != self.last_server: + break + self.last_server = new_server + return new_server + + def hook_prerequest(self): + """Hook to be called before doing request""" + pass + + def hook_postrequest(self, successful: bool): + """Hook to be called after request finished""" + pass + +class ServerVoteSelector(ServerSelector): + """Tries to return faster servers more often.""" + + def __init__(self, server_list=None): + """Populate server list and weighting variables.""" + super().__init__(server_list) + self.servers_weights = [3] * len(self.server_list) + self.check_time_sum = 3 + self.check_time_count = 1 + + def get_master_server(self): + """Return weighted choice from server list.""" + weight_sum = 0 + for i in self.servers_weights: + weight_sum += i + numpy_weights = [] + for i in self.servers_weights: + numpy_weights.append(i/weight_sum) + self.last_server = numpy.random.choice(self.server_list, p=numpy_weights) + return self.last_server + + def master_server_downvote(self): + """Decrease weight of last chosen server.""" + idx = self.server_list.index(self.last_server) + if self.servers_weights[idx] > 1: + self.servers_weights[idx] -= 1 + + def master_server_upvote(self): + """Increase weight of last chosen server.""" + idx = self.server_list.index(self.last_server) + if self.servers_weights[idx] < 10: + self.servers_weights[idx] += 1 + + def check_time_add(self, duration): + """Record connection time.""" + self.check_time_sum += duration + self.check_time_count += 1 + + def check_time_avg(self): + """Return average connection time.""" + return self.check_time_sum / self.check_time_count + + def master_server_vote_on_time(self, last_duration): + """Change weight of a server based on average connection time.""" + avg_duration = self.check_time_avg() + if last_duration < avg_duration - 0.5: + self.master_server_upvote() + elif last_duration > avg_duration + 0.5: + self.master_server_downvote() + + def hook_prerequest(self): + """Hook to be called before doing request""" + self.reqtime_start = time.perf_counter() + + def hook_postrequest(self, successful: bool): + """Hook to be called after request finished""" + reqtime = time.perf_counter() - self.reqtime_start + self.check_time_add(reqtime) + if successful: + self.master_server_vote_on_time(reqtime) + else: + self.master_server_downvote() diff --git a/tcllib/requests/tclrequest.py b/tcllib/requests/tclrequest.py new file mode 100644 index 0000000..9d39607 --- /dev/null +++ b/tcllib/requests/tclrequest.py @@ -0,0 +1,26 @@ +# -*- coding: utf-8 -*- + +from . import tclresult + +class TclRequest: + def __init__(self): + self.uri = "" + self.rawmode = False + self.response = None + self.result = None + self.error = None + self.success = False + + def get_headers(self): + return {} + + def get_params(self): + return {} + + def is_done(self, http_status: int, contents: str): + """Checks if query is done or needs retry.""" + return False + + def get_result(self) -> tclresult.TclResult: + """Returns Result object.""" + return self.result diff --git a/tcllib/requests/tclresult.py b/tcllib/requests/tclresult.py new file mode 100644 index 0000000..3c39f41 --- /dev/null +++ b/tcllib/requests/tclresult.py @@ -0,0 +1,68 @@ +# -*- coding: utf-8 -*- + +import xml.dom.minidom + +from defusedxml import ElementTree + +from .. import dumpmgr + + +class TclResult: + def __init__(self, xml: str): + self.raw_xml = xml + self.dumper = dumpmgr.DumpMgr() + self.dumper.write_dump(xml) + + def delete_dump(self): + self.dumper.delete_last_dump() + + def pretty_xml(self): + """Return prettified input XML with ``xml.dom.minidom``.""" + mdx = xml.dom.minidom.parseString(self.raw_xml) + return mdx.toprettyxml(indent=" ") + +class CheckResult(TclResult): + def __init__(self, xml: str): + super().__init__(xml) + root = ElementTree.fromstring(xml) + self.curef = root.find("CUREF").text + self.fvver = root.find("VERSION").find("FV").text + self.tvver = root.find("VERSION").find("TV").text + self.fw_id = root.find("FIRMWARE").find("FW_ID").text + fileinfo = root.find("FIRMWARE").find("FILESET").find("FILE") + self.fileid = fileinfo.find("FILE_ID").text + self.filename = fileinfo.find("FILENAME").text + self.filesize = fileinfo.find("SIZE").text + self.filehash = fileinfo.find("CHECKSUM").text + +class DownloadResult(TclResult): + def __init__(self, xml: str): + super().__init__(xml) + root = ElementTree.fromstring(xml) + file = root.find("FILE_LIST").find("FILE") + self.fileid = file.find("FILE_ID").text + self.fileurl = file.find("DOWNLOAD_URL").text + s3_fileurl_node = file.find("S3_DOWNLOAD_URL") + self.s3_fileurl = None + if s3_fileurl_node is not None: + self.s3_fileurl = s3_fileurl_node.text + slave_list = root.find("SLAVE_LIST").findall("SLAVE") + enc_list = root.find("SLAVE_LIST").findall("ENCRYPT_SLAVE") + s3_slave_list = root.find("SLAVE_LIST").findall("S3_SLAVE") + self.slaves = [s.text for s in slave_list] + self.encslaves = [s.text for s in enc_list] + self.s3_slaves = [s.text for s in s3_slave_list] + +class ChecksumResult(TclResult): + def __init__(self, xml: str): + super().__init__(xml) + root = ElementTree.fromstring(xml) + file = root.find("FILE_CHECKSUM_LIST").find("FILE") + self.file_addr = file.find("ADDRESS").text + self.sha1_enc_footer = file.find("ENCRYPT_FOOTER").text + self.sha1_footer = file.find("FOOTER").text + self.sha1_body = file.find("BODY").text + +class EncryptHeaderResult(TclResult): + def __init__(self, contents: str): + self.rawdata = contents diff --git a/tcllib/servervote.py b/tcllib/servervote.py deleted file mode 100644 index 8755cb9..0000000 --- a/tcllib/servervote.py +++ /dev/null @@ -1,66 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -# pylint: disable=C0111,C0326,C0103 - -"""Tools to sort API servers to find the least awful one.""" - -import numpy - - -class ServerVoteMixin: - """A mixin component for server sorting.""" - - def __init__(self): - """Populate server list and weighting variables.""" - self.g2master = None - self.master_servers = [ - "g2master-us-east.tclclouds.com", - "g2master-us-west.tclclouds.com", - "g2master-eu-west.tclclouds.com", - "g2master-ap-south.tclclouds.com", - "g2master-ap-north.tclclouds.com", - "g2master-sa-east.tclclouds.com", - ] - self.master_servers_weights = [3] * len(self.master_servers) - self.check_time_sum = 3 - self.check_time_count = 1 - - def get_master_server(self): - """Return weighted choice from server list.""" - weight_sum = 0 - for i in self.master_servers_weights: - weight_sum += i - numpy_weights = [] - for i in self.master_servers_weights: - numpy_weights.append(i/weight_sum) - return numpy.random.choice(self.master_servers, p=numpy_weights) - - def master_server_downvote(self): - """Decrease weight of a server.""" - idx = self.master_servers.index(self.g2master) - if self.master_servers_weights[idx] > 1: - self.master_servers_weights[idx] -= 1 - - def master_server_upvote(self): - """Increase weight of a server.""" - idx = self.master_servers.index(self.g2master) - if self.master_servers_weights[idx] < 10: - self.master_servers_weights[idx] += 1 - - def check_time_add(self, duration): - """Record connection time.""" - self.check_time_sum += duration - self.check_time_count += 1 - - def check_time_avg(self): - """Return average connection time.""" - return self.check_time_sum / self.check_time_count - - def master_server_vote_on_time(self, last_duration): - """Change weight of a server based on average connection time.""" - avg_duration = self.check_time_avg() - if last_duration < avg_duration - 0.5: - self.master_server_upvote() - elif last_duration > avg_duration + 0.5: - self.master_server_downvote() diff --git a/tcllib/tclcheck.py b/tcllib/tclcheck.py deleted file mode 100644 index 32d483d..0000000 --- a/tcllib/tclcheck.py +++ /dev/null @@ -1,94 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -# pylint: disable=C0111,C0326,C0103 - -"""Tools to interface with TCL's update request API.""" - -import time -from collections import OrderedDict, defaultdict - -import requests -from defusedxml import ElementTree - -from .devices import Device - - -class TclCheckMixin: - """A mixin component for TCL's update request API.""" - - def prep_check_url(self, https=True): - """Prepare URL for update request.""" - protocol = "https://" if https else "http://" - url = protocol + self.g2master + "/check.php" - return url - - def prep_check(self, device: Device, https=True): - """Prepare URL and parameters for update request.""" - url = self.prep_check_url(https) - params = OrderedDict() - params["id"] = device.imei - params["curef"] = device.curef - params["fv"] = device.fwver - params["mode"] = device.mode - params["type"] = device.type - params["cltp"] = device.cltp - params["cktp"] = device.cktp - params["rtd"] = device.rtd - params["chnl"] = device.chnl - #params["osvs"] = device.osvs - #params["ckot"] = device.ckot - return url, params - - def do_check(self, device: Device, https=True, timeout=10, max_tries=5): - """Perform update request with given parameters.""" - url, params = self.prep_check(device, https) - last_response = None - for _ in range(0, max_tries): - try: - reqtime_start = time.perf_counter() - req = self.sess.get(url, params=params, timeout=timeout) - reqtime = time.perf_counter() - reqtime_start - self.check_time_add(reqtime) - last_response = req - if req.status_code == 200: - self.master_server_vote_on_time(reqtime) - req.encoding = "utf-8" # Force encoding as server doesn't give one - self.write_dump(req.text) - return req.text - elif req.status_code not in [500, 502, 503]: - self.do_check_errorhandle(req, reqtime) - except requests.exceptions.Timeout: - pass - # Something went wrong, try a different server - self.master_server_downvote() - self.g2master = self.get_master_server() - url = self.prep_check_url(https) - raise requests.exceptions.RetryError("Max tries ({}) reached.".format(max_tries), response=last_response) - - def do_check_errorhandle(self, req, reqtime): - """Handle non-HTTP 200 results for ``do_check``.""" - errcodes = defaultdict(lambda: "HTTP {}.".format(req.status_code)) - errcodes[204] = "No update available." - errcodes[404] = "No data for requested CUREF/FV combination." - if req.status_code in [204, 404]: - self.master_server_vote_on_time(reqtime) - elif req.status_code not in [500, 502, 503]: - self.master_server_downvote() - req.raise_for_status() - raise requests.exceptions.HTTPError(errcodes[req.status_code], response=req) - - @staticmethod - def parse_check(xmlstr): - """Parse output of ``do_check``.""" - root = ElementTree.fromstring(xmlstr) - curef = root.find("CUREF").text - fvver = root.find("VERSION").find("FV").text - tvver = root.find("VERSION").find("TV").text - fw_id = root.find("FIRMWARE").find("FW_ID").text - fileinfo = root.find("FIRMWARE").find("FILESET").find("FILE") - fileid = fileinfo.find("FILE_ID").text - filename = fileinfo.find("FILENAME").text - filesize = fileinfo.find("SIZE").text - filehash = fileinfo.find("CHECKSUM").text - return curef, fvver, tvver, fw_id, fileid, filename, filesize, filehash diff --git a/tcllib/tclchecksum.py b/tcllib/tclchecksum.py deleted file mode 100644 index 84b281f..0000000 --- a/tcllib/tclchecksum.py +++ /dev/null @@ -1,56 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -# pylint: disable=C0111,C0326,C0103 - -"""Tools to interface with TCL's checksum API.""" - -import json - -from defusedxml import ElementTree - -from . import credentials - - -class TclChecksumMixin: - """A mixin component for TCL's checksum API.""" - - @staticmethod - def prep_checksum(encslave, address, uri): - """Prepare URL and parameters for checksum request.""" - url = "http://" + encslave + "/checksum.php" - params = credentials.get_creds2() - payload = {address: uri} - payload_json = json.dumps(payload) - params[b"address"] = bytes(payload_json, "utf-8") - return url, params - - def do_checksum(self, encslave, address, uri): - """Perform checksum request with given parameters.""" - url, params = self.prep_checksum(encslave, address, uri) - # print(repr(dict(params))) - req = self.sess.post(url, data=params) - if req.status_code == 200: - req.encoding = "utf-8" # Force encoding as server doesn't give one - self.write_dump(req.text) - # 2abfa6f6507044fec995efede5d818e62a0b19b5 means ERROR (invalid ADDRESS!) - if "2abfa6f6507044fec995efede5d818e62a0b19b5" in req.text: - print("INVALID URI: {}".format(uri)) - raise SystemExit - return req.text - else: - print("CHECKSUM: " + repr(req)) - print(repr(req.headers)) - print(repr(req.text)) - raise SystemExit - - @staticmethod - def parse_checksum(xmlstr): - """Parse output of ``do_checksum``.""" - root = ElementTree.fromstring(xmlstr) - file = root.find("FILE_CHECKSUM_LIST").find("FILE") - file_addr = file.find("ADDRESS").text - sha1_enc_footer = file.find("ENCRYPT_FOOTER").text - sha1_footer = file.find("FOOTER").text - sha1_body = file.find("BODY").text - return file_addr, sha1_body, sha1_enc_footer, sha1_footer diff --git a/tcllib/tclencheader.py b/tcllib/tclencheader.py deleted file mode 100644 index 33dde10..0000000 --- a/tcllib/tclencheader.py +++ /dev/null @@ -1,27 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -# pylint: disable=C0111,C0326,C0103 - -"""Tools to interface with TCL's encrypted header API.""" - -from . import credentials - - -class TclEncHeaderMixin: - """A mixin component for TCL's encrypted header API..""" - - def do_encrypt_header(self, encslave, address): - """Perform encrypted header request with given parameters.""" - params = credentials.get_creds2() - params[b"address"] = bytes(address, "utf-8") - url = "http://" + encslave + "/encrypt_header.php" - req = self.sess.post(url, data=params, verify=False) - # Expect "HTTP 206 Partial Content" response - if req.status_code == 206: - return req.content - else: - print("ENCRYPT: " + repr(req)) - print(repr(req.headers)) - print(repr(req.text)) - raise SystemExit diff --git a/tcllib/tclrequest.py b/tcllib/tclrequest.py deleted file mode 100644 index 465ebd2..0000000 --- a/tcllib/tclrequest.py +++ /dev/null @@ -1,123 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -# pylint: disable=C0111,C0326,C0103 - -"""Tools to interface with TCL's download request API.""" - -import binascii -import hashlib -import random -import time -import zlib -from collections import OrderedDict -from math import floor - -from defusedxml import ElementTree - - -''' - private HashMap buildDownloadUrisParams(UpdatePackageInfo updatePackageInfo) { - FotaLog.m28v(TAG, "doAfterCheck"); - String salt = FotaUtil.salt(); - HashMap linkedHashMap = new LinkedHashMap(); - linkedHashMap.put("id", this.internalBuilder.getParam("id")); - linkedHashMap.put("salt", salt); - linkedHashMap.put("curef", updatePackageInfo.mCuref); - linkedHashMap.put("fv", updatePackageInfo.mFv); - linkedHashMap.put("tv", updatePackageInfo.mTv); - linkedHashMap.put("type", "Firmware"); - linkedHashMap.put("fw_id", updatePackageInfo.mFirmwareId); - linkedHashMap.put("mode", "2"); - linkedHashMap.put("vk", generateVk2((LinkedHashMap) linkedHashMap.clone())); - linkedHashMap.put("cltp", "10"); - linkedHashMap.put("cktp", this.internalBuilder.getParam("cktp")); - linkedHashMap.put("rtd", this.internalBuilder.getParam("rtd")); - linkedHashMap.put("chnl", this.internalBuilder.getParam("chnl")); - return linkedHashMap; - } -''' - -VDKEY_B64Z = b"eJwdjwEOwDAIAr8kKFr//7HhmqXp8AIIDrYAgg8byiUXrwRJRXja+d6iNxu0AhUooDCN9rd6rDLxmGIakUVWo3IGCTRWqCAt6X4jGEIUAxgN0eYWnp+LkpHQAg/PsO90ELsy0Npm/n2HbtPndFgGEV31R9OmT4O4nrddjc3Qt6nWscx7e+WRHq5UnOudtjw5skuV09pFhvmqnOEIs4ljPeel1wfLYUF4\n" - - -def get_salt(): - """Generate cryptographic salt.""" - millis = floor(time.time() * 1000) - tail = "{:06d}".format(random.randint(0, 999999)) - return "{}{}".format(str(millis), tail) - - -def get_vk2(params_dict, cltp): - """Generate salted hash of API parameters.""" - params_dict["cltp"] = cltp - query = "" - for key, val in params_dict.items(): - if query: - query += "&" - query += key + "=" + str(val) - vdk = zlib.decompress(binascii.a2b_base64(VDKEY_B64Z)) - query += vdk.decode("utf-8") - engine = hashlib.sha1() - engine.update(bytes(query, "utf-8")) - hexhash = engine.hexdigest() - return hexhash - - -class TclRequestMixin: - """A mixin component for TCL's download request API.""" - - def prep_request(self, curef, fvver, tvver, fw_id): - """Prepare URL and device parameters for download request.""" - url = "https://" + self.g2master + "/download_request.php" - params = OrderedDict() - params["id"] = self.serid - params["salt"] = get_salt() - params["curef"] = curef - params["fv"] = fvver - params["tv"] = tvver - params["type"] = self.ftype - params["fw_id"] = fw_id - params["mode"] = self.mode.value - params["vk"] = get_vk2(params, self.cltp.value) - params["cltp"] = self.cltp.value - params["cktp"] = self.cktp.value - params["rtd"] = self.rtd.value - if self.mode == self.MODE.FULL: - params["foot"] = 1 - params["chnl"] = self.chnl.value - return url, params - - def do_request(self, curef, fvver, tvver, fw_id): - """Perform download request with given parameters.""" - url, params = self.prep_request(curef, fvver, tvver, fw_id) - # print(repr(dict(params))) - req = self.sess.post(url, data=params) - if req.status_code == 200: - req.encoding = "utf-8" # Force encoding as server doesn't give one - self.write_dump(req.text) - return req.text - else: - print("REQUEST: " + repr(req)) - print(repr(req.headers)) - print(repr(req.text)) - raise SystemExit - - @staticmethod - def parse_request(xmlstr): - """Parse output of ``do_request``.""" - root = ElementTree.fromstring(xmlstr) - file = root.find("FILE_LIST").find("FILE") - fileid = file.find("FILE_ID").text - fileurl = file.find("DOWNLOAD_URL").text - s3_fileurl_node = file.find("S3_DOWNLOAD_URL") - s3_fileurl = "" - if s3_fileurl_node: - s3_fileurl = s3_fileurl_node.text - slave_list = root.find("SLAVE_LIST").findall("SLAVE") - enc_list = root.find("SLAVE_LIST").findall("ENCRYPT_SLAVE") - s3_slave_list = root.find("SLAVE_LIST").findall("S3_SLAVE") - slaves = [s.text for s in slave_list] - encslaves = [s.text for s in enc_list] - s3_slaves = [s.text for s in s3_slave_list] - return fileid, fileurl, slaves, encslaves, s3_fileurl, s3_slaves diff --git a/tcllib/xmltools.py b/tcllib/xmltools.py deleted file mode 100644 index 5933a19..0000000 --- a/tcllib/xmltools.py +++ /dev/null @@ -1,14 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -# pylint: disable=C0111,C0326,C0103 - -"""XML tools.""" - -import xml.dom.minidom - - -def pretty_xml(xmlstr): - """Prettify input XML with ``xml.dom.minidom``.""" - mdx = xml.dom.minidom.parseString(xmlstr) - return mdx.toprettyxml(indent=" ")