From 34ba8a0db01177344ec1e9e98e6a321bda1b8c4d Mon Sep 17 00:00:00 2001 From: Markus Birth Date: Sat, 3 Feb 2018 01:36:08 +0100 Subject: [PATCH] Took a hacksaw and split tcllib into several small parts. --- tclcheck_allfull.py | 5 +- tclcheck_allota.py | 3 +- tclcheck_findprd.py | 3 +- tclcheck_findprd2.py | 3 +- tclcheck_findver.py | 3 +- tcllib/__init__.py | 392 +++-------------------------------------- tcllib/ansi.py | 16 ++ tcllib/credentials.py | 22 +++ tcllib/devlist.py | 65 +++++++ tcllib/dumpmgr.py | 34 ++++ tcllib/servervote.py | 36 ++++ tcllib/tclcheck.py | 70 ++++++++ tcllib/tclchecksum.py | 39 ++++ tcllib/tclencheader.py | 16 ++ tcllib/tclrequest.py | 102 +++++++++++ tcllib/xmltools.py | 9 + 16 files changed, 440 insertions(+), 378 deletions(-) create mode 100644 tcllib/ansi.py create mode 100644 tcllib/credentials.py create mode 100644 tcllib/devlist.py create mode 100644 tcllib/dumpmgr.py create mode 100644 tcllib/servervote.py create mode 100644 tcllib/tclcheck.py create mode 100644 tcllib/tclchecksum.py create mode 100644 tcllib/tclencheader.py create mode 100644 tcllib/tclrequest.py create mode 100644 tcllib/xmltools.py diff --git a/tclcheck_allfull.py b/tclcheck_allfull.py index 6922991..a277a9f 100644 --- a/tclcheck_allfull.py +++ b/tclcheck_allfull.py @@ -6,6 +6,7 @@ import sys import tcllib import tcllib.argparser +from tcllib import ansi from requests.exceptions import RequestException, Timeout tcllib.make_escapes_work() @@ -46,8 +47,8 @@ for prd, variant in prds.items(): txt_tv = tv if tv != lastver: txt_tv = "{} (old: {} / OTA: {})".format( - tcllib.ANSI_CYAN + txt_tv + tcllib.ANSI_RESET, - tcllib.ANSI_CYAN_DARK + variant["last_full"] + tcllib.ANSI_RESET, + ansi.CYAN + txt_tv + ansi.RESET, + ansi.CYAN_DARK + variant["last_full"] + ansi.RESET, variant["last_ota"] ) else: diff --git a/tclcheck_allota.py b/tclcheck_allota.py index 598155f..5e7b754 100644 --- a/tclcheck_allota.py +++ b/tclcheck_allota.py @@ -8,6 +8,7 @@ import requests import sys import tcllib import tcllib.argparser +from tcllib import ansi from requests.exceptions import RequestException tcllib.make_escapes_work() @@ -52,7 +53,7 @@ for prd, variant in prds.items(): fc.fv = lastver check_xml = fc.do_check(max_tries=20) curef, fv, tv, fw_id, fileid, fn, fsize, fhash = fc.parse_check(check_xml) - versioninfo = tcllib.ANSI_YELLOW_DARK + fv + tcllib.ANSI_RESET + " ⇨ " + tcllib.ANSI_YELLOW + tv + tcllib.ANSI_RESET + " (FULL: {})".format(variant["last_full"]) + versioninfo = ansi.YELLOW_DARK + fv + ansi.RESET + " ⇨ " + ansi.YELLOW + tv + ansi.RESET + " (FULL: {})".format(variant["last_full"]) print("{}: {} {} ({})".format(prd, versioninfo, fhash, model)) except RequestException as e: print("{} ({}): {}".format(prd, lastver, str(e))) diff --git a/tclcheck_findprd.py b/tclcheck_findprd.py index 490207e..82c9431 100755 --- a/tclcheck_findprd.py +++ b/tclcheck_findprd.py @@ -7,6 +7,7 @@ import collections import sys import tcllib import tcllib.argparser +from tcllib import ansi from requests.exceptions import RequestException, Timeout tcllib.make_escapes_work() @@ -68,7 +69,7 @@ for center in sorted(prddict.keys()): curef = "PRD-{}-{:03}".format(center, j) done_count += 1 print("Checking {} ({}/{})".format(curef, done_count, total_count)) - print(tcllib.ANSI_UP_DEL, end="") + print(ansi.UP_DEL, end="") try: fc.reset_session() fc.curef = curef diff --git a/tclcheck_findprd2.py b/tclcheck_findprd2.py index 7b709df..dc4712a 100644 --- a/tclcheck_findprd2.py +++ b/tclcheck_findprd2.py @@ -7,6 +7,7 @@ import collections import sys import tcllib import tcllib.argparser +from tcllib import ansi from requests.exceptions import RequestException, Timeout # Variants to scan for @@ -56,7 +57,7 @@ for center in to_scan: curef = "PRD-{:05}-{:3}".format(center, j) done_count += 1 print("Checking {} ({}/{})".format(curef, done_count, total_count)) - print(tcllib.ANSI_UP_DEL, end="") + print(ansi.UP_DEL, end="") try: fc.reset_session() fc.curef = curef diff --git a/tclcheck_findver.py b/tclcheck_findver.py index fcbdc4e..d09627c 100755 --- a/tclcheck_findver.py +++ b/tclcheck_findver.py @@ -6,6 +6,7 @@ import sys import tcllib import tcllib.argparser +from tcllib import ansi from requests.exceptions import RequestException, Timeout tcllib.make_escapes_work() @@ -54,7 +55,7 @@ total_count = len(allvers) for fv in allvers: done_count += 1 print("Checking {} ({}/{})".format(fv, done_count, total_count)) - print(tcllib.ANSI_UP_DEL, end="") + print(ansi.UP_DEL, end="") try: fc.reset_session() fc.fv = fv diff --git a/tcllib/__init__.py b/tcllib/__init__.py index 93ad56d..c60ec4e 100644 --- a/tcllib/__init__.py +++ b/tcllib/__init__.py @@ -2,43 +2,18 @@ # pylint: disable=C0111,C0326 -import base64 -import binascii import enum -import errno -import glob -import hashlib -import json -import os import platform -import random -import time -import xml.dom.minidom -import zlib -from collections import OrderedDict -from math import floor -import numpy import requests -from defusedxml import ElementTree - -DEVICELIST_URL = "https://tclota.birth-online.de/json_lastupdates.php" -DEVICELIST_FILE = "prds.json" -DEVICELIST_CACHE_SECONDS = 86400 - -ANSI_UP_DEL = u"\u001b[F\u001b[K" -ANSI_BLACK = u"\u001b[0;30m" -ANSI_RED_DARK = u"\u001b[0;31m" -ANSI_GREEN_DARK = u"\u001b[0;32m" -ANSI_YELLOW_DARK = u"\u001b[0;33m" -ANSI_CYAN_DARK = u"\u001b[0;36m" -ANSI_SILVER = u"\u001b[0;37m" -ANSI_GREY = u"\u001b[1;30m" -ANSI_RED = u"\u001b[1;31m" -ANSI_GREEN = u"\u001b[1;32m" -ANSI_YELLOW = u"\u001b[1;33m" -ANSI_CYAN = u"\u001b[1;36m" -ANSI_WHITE = u"\u001b[1;37m" -ANSI_RESET = u"\u001b[0m" +from . import credentials +from . import devlist +from . import dumpmgr +from . import servervote +from . import tclcheck +from . import tclrequest +from . import tclchecksum +from . import tclencheader +from . import xmltools def make_escapes_work(): system = platform.system() @@ -54,7 +29,17 @@ def make_escapes_work(): def default_enum(enumname, vardict): return enum.IntEnum(enumname, vardict, module=__name__, qualname="tcllib.FotaCheck.{}".format(enumname)) -class FotaCheck: +class FotaCheck( + tclcheck.TclCheck, + tclrequest.TclRequest, + tclchecksum.TclChecksum, + tclencheader.TclEncHeader, + servervote.ServerVote, + credentials.Credentials, + devlist.DevList, + dumpmgr.DumpMgr, + xmltools.XmlTools + ): VDKEY = b"eJwdjwEOwDAIAr8kKFr//7HhmqXp8AIIDrYAgg8byiUXrwRJRXja+d6iNxu0AhUooDCN9rd6rDLxmGIakUVWo3IGCTRWqCAt6X4jGEIUAxgN0eYWnp+LkpHQAg/PsO90ELsy0Npm/n2HbtPndFgGEV31R9OmT4O4nrddjc3Qt6nWscx7e+WRHq5UnOudtjw5skuV09pFhvmqnOEIs4ljPeel1wfLYUF4\n" CKTP = default_enum("CKTP", ["AUTO", "MANUAL"]) MODE = default_enum("MODE", {"OTA": 2, "FULL": 4}) @@ -98,340 +83,3 @@ class FotaCheck: else: self.sess.headers.update({"User-Agent": "tcl"}) return self.sess - - @staticmethod - def get_salt(): - millis = floor(time.time() * 1000) - tail = "{:06d}".format(random.randint(0, 999999)) - return "{}{}".format(str(millis), tail) - - def get_master_server(self): - weight_sum = 0 - for i in self.master_servers_weights: - weight_sum += i - numpy_weights = [] - for i in self.master_servers_weights: - numpy_weights.append(i/weight_sum) - return numpy.random.choice(self.master_servers, p=numpy_weights) - - def master_server_downvote(self): - idx = self.master_servers.index(self.g2master) - if self.master_servers_weights[idx] > 1: - self.master_servers_weights[idx] -= 1 - - def master_server_upvote(self): - idx = self.master_servers.index(self.g2master) - if self.master_servers_weights[idx] < 10: - self.master_servers_weights[idx] += 1 - - def check_time_add(self, duration): - self.check_time_sum += duration - self.check_time_count += 1 - - def check_time_avg(self): - return (self.check_time_sum / self.check_time_count) - - def master_server_vote_on_time(self, last_duration, avg_duration): - if last_duration < avg_duration - 0.5: - self.master_server_upvote() - elif last_duration > avg_duration + 0.5: - self.master_server_downvote() - - def write_dump(self, data): - outfile = os.path.normpath("logs/{}.xml".format(self.get_salt())) - if not os.path.exists(os.path.dirname(outfile)): - try: - os.makedirs(os.path.dirname(outfile)) - except OSError as e: - if e.errno != errno.EEXIST: - raise - with open(outfile, "w", encoding="utf-8") as f: - f.write(data) - self.last_dump_filename = outfile - - def delete_last_dump(self): - if self.last_dump_filename: - os.unlink(self.last_dump_filename) - self.last_dump_filename = None - - @staticmethod - def write_info_if_dumps_found(): - # To disable this info, uncomment the following line. - #return - files = glob.glob(os.path.normpath("logs/*.xml")) - if len(files) > 0: - print() - print("{}There are {} logs collected in the logs/ directory.{} Please consider uploading".format(ANSI_YELLOW, len(files), ANSI_RESET)) - print("them to https://tclota.birth-online.de/ by running {}./upload_logs.py{}.".format(ANSI_CYAN, ANSI_RESET)) - - def do_check(self, https=True, timeout=10, max_tries=5): - protocol = "https://" if https else "http://" - url = protocol + self.g2master + "/check.php" - params = OrderedDict() - params["id"] = self.serid - params["curef"] = self.curef - params["fv"] = self.fv - params["mode"] = self.mode.value - params["type"] = self.ftype - params["cltp"] = self.cltp.value - params["cktp"] = self.cktp.value - params["rtd"] = self.rtd.value - params["chnl"] = self.chnl.value - #params["osvs"] = self.osvs - #params["ckot"] = self.ckot.value - - last_response = None - for num_try in range(0, max_tries): - try: - reqtime_start = time.perf_counter() - req = self.sess.get(url, params=params, timeout=timeout) - reqtime = time.perf_counter() - reqtime_start - reqtime_avg = self.check_time_avg() - self.check_time_add(reqtime) - last_response = req - if req.status_code == 200: - self.master_server_vote_on_time(reqtime, reqtime_avg) - req.encoding = "utf-8" # Force encoding as server doesn't give one - self.write_dump(req.text) - return req.text - elif req.status_code == 204: - self.master_server_vote_on_time(reqtime, reqtime_avg) - raise requests.exceptions.HTTPError("No update available.", response=req) - elif req.status_code == 404: - self.master_server_vote_on_time(reqtime, reqtime_avg) - raise requests.exceptions.HTTPError("No data for requested CUREF/FV combination.", response=req) - elif req.status_code not in [500, 502, 503]: - self.master_server_downvote() - req.raise_for_status() - raise requests.exceptions.HTTPError("HTTP {}.".format(req.status_code), response=req) - except requests.exceptions.Timeout: - pass - # Something went wrong, try a different server - self.master_server_downvote() - self.g2master = self.get_master_server() - protocol = "https://" if https else "http://" - url = protocol + self.g2master + "/check.php" - raise requests.exceptions.RetryError("Max tries ({}) reached.".format(max_tries), response=last_response) - - @staticmethod - def pretty_xml(xmlstr): - mdx = xml.dom.minidom.parseString(xmlstr) - return mdx.toprettyxml(indent=" ") - - @staticmethod - def parse_check(xmlstr): - root = ElementTree.fromstring(xmlstr) - curef = root.find("CUREF").text - fv = root.find("VERSION").find("FV").text - tv = root.find("VERSION").find("TV").text - fw_id = root.find("FIRMWARE").find("FW_ID").text - fileinfo = root.find("FIRMWARE").find("FILESET").find("FILE") - fileid = fileinfo.find("FILE_ID").text - filename = fileinfo.find("FILENAME").text - filesize = fileinfo.find("SIZE").text - filehash = fileinfo.find("CHECKSUM").text - return curef, fv, tv, fw_id, fileid, filename, filesize, filehash - - def get_vk2(self, params_dict, cltp): - params_dict["cltp"] = cltp - query = "" - for k, v in params_dict.items(): - if len(query) > 0: - query += "&" - query += k + "=" + str(v) - vdk = zlib.decompress(binascii.a2b_base64(self.VDKEY)) - query += vdk.decode("utf-8") - engine = hashlib.sha1() - engine.update(bytes(query, "utf-8")) - hexhash = engine.hexdigest() - return hexhash - - @staticmethod - def get_devicelist(force=False, output_diff=True): - need_download = True - - old_prds = None - try: - filestat = os.stat(DEVICELIST_FILE) - filemtime = filestat.st_mtime - if filemtime > time.time() - DEVICELIST_CACHE_SECONDS: - need_download = False - with open(DEVICELIST_FILE, "rt") as df: - old_prds = json.load(df) - except FileNotFoundError: - pass - - if need_download or force: - prds_json = requests.get(DEVICELIST_URL).text - with open(DEVICELIST_FILE, "wt") as df: - df.write(prds_json) - - with open(DEVICELIST_FILE, "rt") as df: - prds = json.load(df) - - if old_prds and output_diff: - FotaCheck.print_prd_diff(old_prds, prds) - - return prds - - @staticmethod - def print_prd_diff(old_prds, new_prds): - added_prds = [prd for prd in new_prds if prd not in old_prds] - removed_prds = [prd for prd in old_prds if prd not in new_prds] - for prd in removed_prds: - print("> Removed device {} (was at {} / OTA: {}).".format(ANSI_RED + prd + ANSI_RESET, old_prds[prd]["last_full"], old_prds[prd]["last_ota"])) - for prd in added_prds: - print("> New device {} ({} / OTA: {}).".format(ANSI_GREEN + prd + ANSI_RESET, new_prds[prd]["last_full"], new_prds[prd]["last_ota"])) - for prd, pdata in new_prds.items(): - if prd in added_prds: - continue - odata = old_prds[prd] - if pdata["last_full"] != odata["last_full"] and pdata["last_ota"] != odata["last_ota"]: - print("> {}: {} ⇨ {} (OTA: {} ⇨ {})".format( - prd, - ANSI_CYAN_DARK + str(odata["last_full"]) + ANSI_RESET, - ANSI_CYAN + str(pdata["last_full"]) + ANSI_RESET, - ANSI_YELLOW_DARK + str(odata["last_ota"]) + ANSI_RESET, - ANSI_YELLOW + str(pdata["last_ota"]) + ANSI_RESET - )) - elif pdata["last_full"] != odata["last_full"]: - print("> {}: {} ⇨ {} (FULL)".format(prd, ANSI_CYAN_DARK + str(odata["last_full"]) + ANSI_RESET, ANSI_CYAN + str(pdata["last_full"]) + ANSI_RESET)) - elif pdata["last_ota"] != odata["last_ota"]: - print("> {}: {} ⇨ {} (OTA)".format(prd, ANSI_YELLOW_DARK + str(odata["last_ota"]) + ANSI_RESET, ANSI_YELLOW + str(pdata["last_ota"]) + ANSI_RESET)) - - @staticmethod - def get_creds(): - creds = { - b"YWNjb3VudA==": b"emhlbmdodWEuZ2Fv", - b"cGFzc3dvcmQ=": b"cWFydUQ0b2s=", - } - params = {base64.b64decode(key): base64.b64decode(val) for key, val in creds.items()} - return params - - @staticmethod - def get_creds2(): - creds = { - b"YWNjb3VudA==": b"VGVsZUV4dFRlc3Q=", - b"cGFzc3dvcmQ=": b"dDA1MjM=", - } - params = {base64.b64decode(key): base64.b64decode(val) for key, val in creds.items()} - return params - - ''' - private HashMap buildDownloadUrisParams(UpdatePackageInfo updatePackageInfo) { - FotaLog.m28v(TAG, "doAfterCheck"); - String salt = FotaUtil.salt(); - HashMap linkedHashMap = new LinkedHashMap(); - linkedHashMap.put("id", this.internalBuilder.getParam("id")); - linkedHashMap.put("salt", salt); - linkedHashMap.put("curef", updatePackageInfo.mCuref); - linkedHashMap.put("fv", updatePackageInfo.mFv); - linkedHashMap.put("tv", updatePackageInfo.mTv); - linkedHashMap.put("type", "Firmware"); - linkedHashMap.put("fw_id", updatePackageInfo.mFirmwareId); - linkedHashMap.put("mode", "2"); - linkedHashMap.put("vk", generateVk2((LinkedHashMap) linkedHashMap.clone())); - linkedHashMap.put("cltp", "10"); - linkedHashMap.put("cktp", this.internalBuilder.getParam("cktp")); - linkedHashMap.put("rtd", this.internalBuilder.getParam("rtd")); - linkedHashMap.put("chnl", this.internalBuilder.getParam("chnl")); - return linkedHashMap; - } - ''' - - def do_request(self, curef, fv, tv, fw_id): - url = "https://" + self.g2master + "/download_request.php" - params = OrderedDict() - params["id"] = self.serid - params["salt"] = self.get_salt() - params["curef"] = curef - params["fv"] = fv - params["tv"] = tv - params["type"] = self.ftype - params["fw_id"] = fw_id - params["mode"] = self.mode.value - params["vk"] = self.get_vk2(params, self.cltp.value) - params["cltp"] = self.cltp.value - params["cktp"] = self.cktp.value - params["rtd"] = self.rtd.value - if self.mode == self.MODE.FULL: - params["foot"] = 1 - params["chnl"] = self.chnl.value - - #print(repr(dict(params))) - req = self.sess.post(url, data=params) - if req.status_code == 200: - req.encoding = "utf-8" # Force encoding as server doesn't give one - self.write_dump(req.text) - return req.text - else: - print("REQUEST: " + repr(req)) - print(repr(req.headers)) - print(repr(req.text)) - raise SystemExit - - @staticmethod - def parse_request(xmlstr): - root = ElementTree.fromstring(xmlstr) - file = root.find("FILE_LIST").find("FILE") - fileid = file.find("FILE_ID").text - fileurl = file.find("DOWNLOAD_URL").text - s3_fileurl_node = file.find("S3_DOWNLOAD_URL") - s3_fileurl = "" - if s3_fileurl_node: - s3_fileurl = s3_fileurl_node.text - slave_list = root.find("SLAVE_LIST").findall("SLAVE") - enc_list = root.find("SLAVE_LIST").findall("ENCRYPT_SLAVE") - s3_slave_list = root.find("SLAVE_LIST").findall("S3_SLAVE") - slaves = [s.text for s in slave_list] - encslaves = [s.text for s in enc_list] - s3_slaves = [s.text for s in s3_slave_list] - return fileid, fileurl, slaves, encslaves, s3_fileurl, s3_slaves - - def do_encrypt_header(self, encslave, address): - params = self.get_creds2() - params[b"address"] = bytes(address, "utf-8") - url = "http://" + encslave + "/encrypt_header.php" - req = self.sess.post(url, data=params, verify=False) - # Expect "HTTP 206 Partial Content" response - if req.status_code == 206: - return req.content - else: - print("ENCRYPT: " + repr(req)) - print(repr(req.headers)) - print(repr(req.text)) - raise SystemExit - - def do_checksum(self, encslave, address, uri): - url = "http://" + encslave + "/checksum.php" - params = self.get_creds2() - - payload = {address: uri} - payload_json = json.dumps(payload) - params[b"address"] = bytes(payload_json, "utf-8") - - #print(repr(dict(params))) - req = self.sess.post(url, data=params) - if req.status_code == 200: - req.encoding = "utf-8" # Force encoding as server doesn't give one - self.write_dump(req.text) - # 2abfa6f6507044fec995efede5d818e62a0b19b5 means ERROR (invalid ADDRESS!) - if "2abfa6f6507044fec995efede5d818e62a0b19b5" in req.text: - print("INVALID URI: {}".format(uri)) - raise SystemExit - return req.text - else: - print("CHECKSUM: " + repr(req)) - print(repr(req.headers)) - print(repr(req.text)) - raise SystemExit - - @staticmethod - def parse_checksum(xmlstr): - root = ElementTree.fromstring(xmlstr) - file = root.find("FILE_CHECKSUM_LIST").find("FILE") - file_addr = file.find("ADDRESS").text - sha1_enc_footer = file.find("ENCRYPT_FOOTER").text - sha1_footer = file.find("FOOTER").text - sha1_body = file.find("BODY").text - return file_addr, sha1_body, sha1_enc_footer, sha1_footer diff --git a/tcllib/ansi.py b/tcllib/ansi.py new file mode 100644 index 0000000..590bfb8 --- /dev/null +++ b/tcllib/ansi.py @@ -0,0 +1,16 @@ +# -*- coding: utf-8 -*- + +UP_DEL = u"\u001b[F\u001b[K" +BLACK = u"\u001b[0;30m" +RED_DARK = u"\u001b[0;31m" +GREEN_DARK = u"\u001b[0;32m" +YELLOW_DARK = u"\u001b[0;33m" +CYAN_DARK = u"\u001b[0;36m" +SILVER = u"\u001b[0;37m" +GREY = u"\u001b[1;30m" +RED = u"\u001b[1;31m" +GREEN = u"\u001b[1;32m" +YELLOW = u"\u001b[1;33m" +CYAN = u"\u001b[1;36m" +WHITE = u"\u001b[1;37m" +RESET = u"\u001b[0m" diff --git a/tcllib/credentials.py b/tcllib/credentials.py new file mode 100644 index 0000000..10ef96c --- /dev/null +++ b/tcllib/credentials.py @@ -0,0 +1,22 @@ +# -*- coding: utf-8 -*- + +import base64 + +class Credentials: + @staticmethod + def get_creds(): + creds = { + b"YWNjb3VudA==": b"emhlbmdodWEuZ2Fv", + b"cGFzc3dvcmQ=": b"cWFydUQ0b2s=", + } + params = {base64.b64decode(key): base64.b64decode(val) for key, val in creds.items()} + return params + + @staticmethod + def get_creds2(): + creds = { + b"YWNjb3VudA==": b"VGVsZUV4dFRlc3Q=", + b"cGFzc3dvcmQ=": b"dDA1MjM=", + } + params = {base64.b64decode(key): base64.b64decode(val) for key, val in creds.items()} + return params diff --git a/tcllib/devlist.py b/tcllib/devlist.py new file mode 100644 index 0000000..a6d92be --- /dev/null +++ b/tcllib/devlist.py @@ -0,0 +1,65 @@ +# -*- coding: utf-8 -*- + +import json +import os +import requests +import time +from . import ansi + +DEVICELIST_URL = "https://tclota.birth-online.de/json_lastupdates.php" +DEVICELIST_FILE = "prds.json" +DEVICELIST_CACHE_SECONDS = 86400 + +class DevList: + @staticmethod + def get_devicelist(force=False, output_diff=True): + need_download = True + + old_prds = None + try: + filestat = os.stat(DEVICELIST_FILE) + filemtime = filestat.st_mtime + if filemtime > time.time() - DEVICELIST_CACHE_SECONDS: + need_download = False + with open(DEVICELIST_FILE, "rt") as df: + old_prds = json.load(df) + except FileNotFoundError: + pass + + if need_download or force: + prds_json = requests.get(DEVICELIST_URL).text + with open(DEVICELIST_FILE, "wt") as df: + df.write(prds_json) + + with open(DEVICELIST_FILE, "rt") as df: + prds = json.load(df) + + if old_prds and output_diff: + DevList.print_prd_diff(old_prds, prds) + + return prds + + @staticmethod + def print_prd_diff(old_prds, new_prds): + added_prds = [prd for prd in new_prds if prd not in old_prds] + removed_prds = [prd for prd in old_prds if prd not in new_prds] + for prd in removed_prds: + print("> Removed device {} (was at {} / OTA: {}).".format(ansi.RED + prd + ansi.RESET, old_prds[prd]["last_full"], old_prds[prd]["last_ota"])) + for prd in added_prds: + print("> New device {} ({} / OTA: {}).".format(ansi.GREEN + prd + ansi.RESET, new_prds[prd]["last_full"], new_prds[prd]["last_ota"])) + for prd, pdata in new_prds.items(): + if prd in added_prds: + continue + odata = old_prds[prd] + if pdata["last_full"] != odata["last_full"] and pdata["last_ota"] != odata["last_ota"]: + print("> {}: {} ⇨ {} (OTA: {} ⇨ {})".format( + prd, + ansi.CYAN_DARK + str(odata["last_full"]) + ansi.RESET, + ansi.CYAN + str(pdata["last_full"]) + ansi.RESET, + ansi.YELLOW_DARK + str(odata["last_ota"]) + ansi.RESET, + ansi.YELLOW + str(pdata["last_ota"]) + ansi.RESET + )) + elif pdata["last_full"] != odata["last_full"]: + print("> {}: {} ⇨ {} (FULL)".format(prd, ansi.CYAN_DARK + str(odata["last_full"]) + ansi.RESET, ansi.CYAN + str(pdata["last_full"]) + ansi.RESET)) + elif pdata["last_ota"] != odata["last_ota"]: + print("> {}: {} ⇨ {} (OTA)".format(prd, ansi.YELLOW_DARK + str(odata["last_ota"]) + ansi.RESET, ansi.YELLOW + str(pdata["last_ota"]) + ansi.RESET)) diff --git a/tcllib/dumpmgr.py b/tcllib/dumpmgr.py new file mode 100644 index 0000000..1be192a --- /dev/null +++ b/tcllib/dumpmgr.py @@ -0,0 +1,34 @@ +# -*- coding: utf-8 -*- + +import errno +import glob +import os +from . import ansi + +class DumpMgr: + def write_dump(self, data): + outfile = os.path.normpath("logs/{}.xml".format(self.get_salt())) + if not os.path.exists(os.path.dirname(outfile)): + try: + os.makedirs(os.path.dirname(outfile)) + except OSError as e: + if e.errno != errno.EEXIST: + raise + with open(outfile, "w", encoding="utf-8") as f: + f.write(data) + self.last_dump_filename = outfile + + def delete_last_dump(self): + if self.last_dump_filename: + os.unlink(self.last_dump_filename) + self.last_dump_filename = None + + @staticmethod + def write_info_if_dumps_found(): + # To disable this info, uncomment the following line. + #return + files = glob.glob(os.path.normpath("logs/*.xml")) + if len(files) > 0: + print() + print("{}There are {} logs collected in the logs/ directory.{} Please consider uploading".format(ansi.YELLOW, len(files), ansi.RESET)) + print("them to https://tclota.birth-online.de/ by running {}./upload_logs.py{}.".format(ansi.CYAN, ansi.RESET)) diff --git a/tcllib/servervote.py b/tcllib/servervote.py new file mode 100644 index 0000000..405c727 --- /dev/null +++ b/tcllib/servervote.py @@ -0,0 +1,36 @@ +# -*- coding: utf-8 -*- + +import numpy + +class ServerVote: + def get_master_server(self): + weight_sum = 0 + for i in self.master_servers_weights: + weight_sum += i + numpy_weights = [] + for i in self.master_servers_weights: + numpy_weights.append(i/weight_sum) + return numpy.random.choice(self.master_servers, p=numpy_weights) + + def master_server_downvote(self): + idx = self.master_servers.index(self.g2master) + if self.master_servers_weights[idx] > 1: + self.master_servers_weights[idx] -= 1 + + def master_server_upvote(self): + idx = self.master_servers.index(self.g2master) + if self.master_servers_weights[idx] < 10: + self.master_servers_weights[idx] += 1 + + def check_time_add(self, duration): + self.check_time_sum += duration + self.check_time_count += 1 + + def check_time_avg(self): + return (self.check_time_sum / self.check_time_count) + + def master_server_vote_on_time(self, last_duration, avg_duration): + if last_duration < avg_duration - 0.5: + self.master_server_upvote() + elif last_duration > avg_duration + 0.5: + self.master_server_downvote() diff --git a/tcllib/tclcheck.py b/tcllib/tclcheck.py new file mode 100644 index 0000000..f370f44 --- /dev/null +++ b/tcllib/tclcheck.py @@ -0,0 +1,70 @@ +# -*- coding: utf-8 -*- + +import time +from collections import OrderedDict +import requests +from defusedxml import ElementTree + +class TclCheck: + def do_check(self, https=True, timeout=10, max_tries=5): + protocol = "https://" if https else "http://" + url = protocol + self.g2master + "/check.php" + params = OrderedDict() + params["id"] = self.serid + params["curef"] = self.curef + params["fv"] = self.fv + params["mode"] = self.mode.value + params["type"] = self.ftype + params["cltp"] = self.cltp.value + params["cktp"] = self.cktp.value + params["rtd"] = self.rtd.value + params["chnl"] = self.chnl.value + #params["osvs"] = self.osvs + #params["ckot"] = self.ckot.value + + last_response = None + for num_try in range(0, max_tries): + try: + reqtime_start = time.perf_counter() + req = self.sess.get(url, params=params, timeout=timeout) + reqtime = time.perf_counter() - reqtime_start + reqtime_avg = self.check_time_avg() + self.check_time_add(reqtime) + last_response = req + if req.status_code == 200: + self.master_server_vote_on_time(reqtime, reqtime_avg) + req.encoding = "utf-8" # Force encoding as server doesn't give one + self.write_dump(req.text) + return req.text + elif req.status_code == 204: + self.master_server_vote_on_time(reqtime, reqtime_avg) + raise requests.exceptions.HTTPError("No update available.", response=req) + elif req.status_code == 404: + self.master_server_vote_on_time(reqtime, reqtime_avg) + raise requests.exceptions.HTTPError("No data for requested CUREF/FV combination.", response=req) + elif req.status_code not in [500, 502, 503]: + self.master_server_downvote() + req.raise_for_status() + raise requests.exceptions.HTTPError("HTTP {}.".format(req.status_code), response=req) + except requests.exceptions.Timeout: + pass + # Something went wrong, try a different server + self.master_server_downvote() + self.g2master = self.get_master_server() + protocol = "https://" if https else "http://" + url = protocol + self.g2master + "/check.php" + raise requests.exceptions.RetryError("Max tries ({}) reached.".format(max_tries), response=last_response) + + @staticmethod + def parse_check(xmlstr): + root = ElementTree.fromstring(xmlstr) + curef = root.find("CUREF").text + fv = root.find("VERSION").find("FV").text + tv = root.find("VERSION").find("TV").text + fw_id = root.find("FIRMWARE").find("FW_ID").text + fileinfo = root.find("FIRMWARE").find("FILESET").find("FILE") + fileid = fileinfo.find("FILE_ID").text + filename = fileinfo.find("FILENAME").text + filesize = fileinfo.find("SIZE").text + filehash = fileinfo.find("CHECKSUM").text + return curef, fv, tv, fw_id, fileid, filename, filesize, filehash diff --git a/tcllib/tclchecksum.py b/tcllib/tclchecksum.py new file mode 100644 index 0000000..13fba15 --- /dev/null +++ b/tcllib/tclchecksum.py @@ -0,0 +1,39 @@ +# -*- coding: utf-8 -*- + +import json +from defusedxml import ElementTree + +class TclChecksum: + def do_checksum(self, encslave, address, uri): + url = "http://" + encslave + "/checksum.php" + params = self.get_creds2() + + payload = {address: uri} + payload_json = json.dumps(payload) + params[b"address"] = bytes(payload_json, "utf-8") + + #print(repr(dict(params))) + req = self.sess.post(url, data=params) + if req.status_code == 200: + req.encoding = "utf-8" # Force encoding as server doesn't give one + self.write_dump(req.text) + # 2abfa6f6507044fec995efede5d818e62a0b19b5 means ERROR (invalid ADDRESS!) + if "2abfa6f6507044fec995efede5d818e62a0b19b5" in req.text: + print("INVALID URI: {}".format(uri)) + raise SystemExit + return req.text + else: + print("CHECKSUM: " + repr(req)) + print(repr(req.headers)) + print(repr(req.text)) + raise SystemExit + + @staticmethod + def parse_checksum(xmlstr): + root = ElementTree.fromstring(xmlstr) + file = root.find("FILE_CHECKSUM_LIST").find("FILE") + file_addr = file.find("ADDRESS").text + sha1_enc_footer = file.find("ENCRYPT_FOOTER").text + sha1_footer = file.find("FOOTER").text + sha1_body = file.find("BODY").text + return file_addr, sha1_body, sha1_enc_footer, sha1_footer diff --git a/tcllib/tclencheader.py b/tcllib/tclencheader.py new file mode 100644 index 0000000..47e1a24 --- /dev/null +++ b/tcllib/tclencheader.py @@ -0,0 +1,16 @@ +# -*- coding: utf-8 -*- + +class TclEncHeader: + def do_encrypt_header(self, encslave, address): + params = self.get_creds2() + params[b"address"] = bytes(address, "utf-8") + url = "http://" + encslave + "/encrypt_header.php" + req = self.sess.post(url, data=params, verify=False) + # Expect "HTTP 206 Partial Content" response + if req.status_code == 206: + return req.content + else: + print("ENCRYPT: " + repr(req)) + print(repr(req.headers)) + print(repr(req.text)) + raise SystemExit diff --git a/tcllib/tclrequest.py b/tcllib/tclrequest.py new file mode 100644 index 0000000..6bdc4fb --- /dev/null +++ b/tcllib/tclrequest.py @@ -0,0 +1,102 @@ +# -*- coding: utf-8 -*- + +import binascii +import hashlib +import random +import time +from math import floor +import zlib +from collections import OrderedDict +from defusedxml import ElementTree + +''' + private HashMap buildDownloadUrisParams(UpdatePackageInfo updatePackageInfo) { + FotaLog.m28v(TAG, "doAfterCheck"); + String salt = FotaUtil.salt(); + HashMap linkedHashMap = new LinkedHashMap(); + linkedHashMap.put("id", this.internalBuilder.getParam("id")); + linkedHashMap.put("salt", salt); + linkedHashMap.put("curef", updatePackageInfo.mCuref); + linkedHashMap.put("fv", updatePackageInfo.mFv); + linkedHashMap.put("tv", updatePackageInfo.mTv); + linkedHashMap.put("type", "Firmware"); + linkedHashMap.put("fw_id", updatePackageInfo.mFirmwareId); + linkedHashMap.put("mode", "2"); + linkedHashMap.put("vk", generateVk2((LinkedHashMap) linkedHashMap.clone())); + linkedHashMap.put("cltp", "10"); + linkedHashMap.put("cktp", this.internalBuilder.getParam("cktp")); + linkedHashMap.put("rtd", this.internalBuilder.getParam("rtd")); + linkedHashMap.put("chnl", this.internalBuilder.getParam("chnl")); + return linkedHashMap; + } +''' + +class TclRequest: + @staticmethod + def get_salt(): + millis = floor(time.time() * 1000) + tail = "{:06d}".format(random.randint(0, 999999)) + return "{}{}".format(str(millis), tail) + + def get_vk2(self, params_dict, cltp): + params_dict["cltp"] = cltp + query = "" + for k, v in params_dict.items(): + if len(query) > 0: + query += "&" + query += k + "=" + str(v) + vdk = zlib.decompress(binascii.a2b_base64(self.VDKEY)) + query += vdk.decode("utf-8") + engine = hashlib.sha1() + engine.update(bytes(query, "utf-8")) + hexhash = engine.hexdigest() + return hexhash + + def do_request(self, curef, fv, tv, fw_id): + url = "https://" + self.g2master + "/download_request.php" + params = OrderedDict() + params["id"] = self.serid + params["salt"] = self.get_salt() + params["curef"] = curef + params["fv"] = fv + params["tv"] = tv + params["type"] = self.ftype + params["fw_id"] = fw_id + params["mode"] = self.mode.value + params["vk"] = self.get_vk2(params, self.cltp.value) + params["cltp"] = self.cltp.value + params["cktp"] = self.cktp.value + params["rtd"] = self.rtd.value + if self.mode == self.MODE.FULL: + params["foot"] = 1 + params["chnl"] = self.chnl.value + + #print(repr(dict(params))) + req = self.sess.post(url, data=params) + if req.status_code == 200: + req.encoding = "utf-8" # Force encoding as server doesn't give one + self.write_dump(req.text) + return req.text + else: + print("REQUEST: " + repr(req)) + print(repr(req.headers)) + print(repr(req.text)) + raise SystemExit + + @staticmethod + def parse_request(xmlstr): + root = ElementTree.fromstring(xmlstr) + file = root.find("FILE_LIST").find("FILE") + fileid = file.find("FILE_ID").text + fileurl = file.find("DOWNLOAD_URL").text + s3_fileurl_node = file.find("S3_DOWNLOAD_URL") + s3_fileurl = "" + if s3_fileurl_node: + s3_fileurl = s3_fileurl_node.text + slave_list = root.find("SLAVE_LIST").findall("SLAVE") + enc_list = root.find("SLAVE_LIST").findall("ENCRYPT_SLAVE") + s3_slave_list = root.find("SLAVE_LIST").findall("S3_SLAVE") + slaves = [s.text for s in slave_list] + encslaves = [s.text for s in enc_list] + s3_slaves = [s.text for s in s3_slave_list] + return fileid, fileurl, slaves, encslaves, s3_fileurl, s3_slaves diff --git a/tcllib/xmltools.py b/tcllib/xmltools.py new file mode 100644 index 0000000..9cc8fc4 --- /dev/null +++ b/tcllib/xmltools.py @@ -0,0 +1,9 @@ +# -*- coding: utf-8 -*- + +import xml.dom.minidom + +class XmlTools: + @staticmethod + def pretty_xml(xmlstr): + mdx = xml.dom.minidom.parseString(xmlstr) + return mdx.toprettyxml(indent=" ")