diff --git a/tcllib/__init__.py b/tcllib/__init__.py index 0f0c8ac..9d6eeee 100644 --- a/tcllib/__init__.py +++ b/tcllib/__init__.py @@ -4,4 +4,6 @@ from .ansi import * from .argparser import * -from .dumpmgr import write_info_if_dumps_found +from .devices import * +from .devlist import * +from .dumpmgr import * diff --git a/tcllib/servervote.py b/tcllib/servervote.py deleted file mode 100644 index 8755cb9..0000000 --- a/tcllib/servervote.py +++ /dev/null @@ -1,66 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -# pylint: disable=C0111,C0326,C0103 - -"""Tools to sort API servers to find the least awful one.""" - -import numpy - - -class ServerVoteMixin: - """A mixin component for server sorting.""" - - def __init__(self): - """Populate server list and weighting variables.""" - self.g2master = None - self.master_servers = [ - "g2master-us-east.tclclouds.com", - "g2master-us-west.tclclouds.com", - "g2master-eu-west.tclclouds.com", - "g2master-ap-south.tclclouds.com", - "g2master-ap-north.tclclouds.com", - "g2master-sa-east.tclclouds.com", - ] - self.master_servers_weights = [3] * len(self.master_servers) - self.check_time_sum = 3 - self.check_time_count = 1 - - def get_master_server(self): - """Return weighted choice from server list.""" - weight_sum = 0 - for i in self.master_servers_weights: - weight_sum += i - numpy_weights = [] - for i in self.master_servers_weights: - numpy_weights.append(i/weight_sum) - return numpy.random.choice(self.master_servers, p=numpy_weights) - - def master_server_downvote(self): - """Decrease weight of a server.""" - idx = self.master_servers.index(self.g2master) - if self.master_servers_weights[idx] > 1: - self.master_servers_weights[idx] -= 1 - - def master_server_upvote(self): - """Increase weight of a server.""" - idx = self.master_servers.index(self.g2master) - if self.master_servers_weights[idx] < 10: - self.master_servers_weights[idx] += 1 - - def check_time_add(self, duration): - """Record connection time.""" - self.check_time_sum += duration - self.check_time_count += 1 - - def check_time_avg(self): - """Return average connection time.""" - return self.check_time_sum / self.check_time_count - - def master_server_vote_on_time(self, last_duration): - """Change weight of a server based on average connection time.""" - avg_duration = self.check_time_avg() - if last_duration < avg_duration - 0.5: - self.master_server_upvote() - elif last_duration > avg_duration + 0.5: - self.master_server_downvote() diff --git a/tcllib/tclcheck.py b/tcllib/tclcheck.py deleted file mode 100644 index 32d483d..0000000 --- a/tcllib/tclcheck.py +++ /dev/null @@ -1,94 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -# pylint: disable=C0111,C0326,C0103 - -"""Tools to interface with TCL's update request API.""" - -import time -from collections import OrderedDict, defaultdict - -import requests -from defusedxml import ElementTree - -from .devices import Device - - -class TclCheckMixin: - """A mixin component for TCL's update request API.""" - - def prep_check_url(self, https=True): - """Prepare URL for update request.""" - protocol = "https://" if https else "http://" - url = protocol + self.g2master + "/check.php" - return url - - def prep_check(self, device: Device, https=True): - """Prepare URL and parameters for update request.""" - url = self.prep_check_url(https) - params = OrderedDict() - params["id"] = device.imei - params["curef"] = device.curef - params["fv"] = device.fwver - params["mode"] = device.mode - params["type"] = device.type - params["cltp"] = device.cltp - params["cktp"] = device.cktp - params["rtd"] = device.rtd - params["chnl"] = device.chnl - #params["osvs"] = device.osvs - #params["ckot"] = device.ckot - return url, params - - def do_check(self, device: Device, https=True, timeout=10, max_tries=5): - """Perform update request with given parameters.""" - url, params = self.prep_check(device, https) - last_response = None - for _ in range(0, max_tries): - try: - reqtime_start = time.perf_counter() - req = self.sess.get(url, params=params, timeout=timeout) - reqtime = time.perf_counter() - reqtime_start - self.check_time_add(reqtime) - last_response = req - if req.status_code == 200: - self.master_server_vote_on_time(reqtime) - req.encoding = "utf-8" # Force encoding as server doesn't give one - self.write_dump(req.text) - return req.text - elif req.status_code not in [500, 502, 503]: - self.do_check_errorhandle(req, reqtime) - except requests.exceptions.Timeout: - pass - # Something went wrong, try a different server - self.master_server_downvote() - self.g2master = self.get_master_server() - url = self.prep_check_url(https) - raise requests.exceptions.RetryError("Max tries ({}) reached.".format(max_tries), response=last_response) - - def do_check_errorhandle(self, req, reqtime): - """Handle non-HTTP 200 results for ``do_check``.""" - errcodes = defaultdict(lambda: "HTTP {}.".format(req.status_code)) - errcodes[204] = "No update available." - errcodes[404] = "No data for requested CUREF/FV combination." - if req.status_code in [204, 404]: - self.master_server_vote_on_time(reqtime) - elif req.status_code not in [500, 502, 503]: - self.master_server_downvote() - req.raise_for_status() - raise requests.exceptions.HTTPError(errcodes[req.status_code], response=req) - - @staticmethod - def parse_check(xmlstr): - """Parse output of ``do_check``.""" - root = ElementTree.fromstring(xmlstr) - curef = root.find("CUREF").text - fvver = root.find("VERSION").find("FV").text - tvver = root.find("VERSION").find("TV").text - fw_id = root.find("FIRMWARE").find("FW_ID").text - fileinfo = root.find("FIRMWARE").find("FILESET").find("FILE") - fileid = fileinfo.find("FILE_ID").text - filename = fileinfo.find("FILENAME").text - filesize = fileinfo.find("SIZE").text - filehash = fileinfo.find("CHECKSUM").text - return curef, fvver, tvver, fw_id, fileid, filename, filesize, filehash diff --git a/tcllib/tclchecksum.py b/tcllib/tclchecksum.py deleted file mode 100644 index 84b281f..0000000 --- a/tcllib/tclchecksum.py +++ /dev/null @@ -1,56 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -# pylint: disable=C0111,C0326,C0103 - -"""Tools to interface with TCL's checksum API.""" - -import json - -from defusedxml import ElementTree - -from . import credentials - - -class TclChecksumMixin: - """A mixin component for TCL's checksum API.""" - - @staticmethod - def prep_checksum(encslave, address, uri): - """Prepare URL and parameters for checksum request.""" - url = "http://" + encslave + "/checksum.php" - params = credentials.get_creds2() - payload = {address: uri} - payload_json = json.dumps(payload) - params[b"address"] = bytes(payload_json, "utf-8") - return url, params - - def do_checksum(self, encslave, address, uri): - """Perform checksum request with given parameters.""" - url, params = self.prep_checksum(encslave, address, uri) - # print(repr(dict(params))) - req = self.sess.post(url, data=params) - if req.status_code == 200: - req.encoding = "utf-8" # Force encoding as server doesn't give one - self.write_dump(req.text) - # 2abfa6f6507044fec995efede5d818e62a0b19b5 means ERROR (invalid ADDRESS!) - if "2abfa6f6507044fec995efede5d818e62a0b19b5" in req.text: - print("INVALID URI: {}".format(uri)) - raise SystemExit - return req.text - else: - print("CHECKSUM: " + repr(req)) - print(repr(req.headers)) - print(repr(req.text)) - raise SystemExit - - @staticmethod - def parse_checksum(xmlstr): - """Parse output of ``do_checksum``.""" - root = ElementTree.fromstring(xmlstr) - file = root.find("FILE_CHECKSUM_LIST").find("FILE") - file_addr = file.find("ADDRESS").text - sha1_enc_footer = file.find("ENCRYPT_FOOTER").text - sha1_footer = file.find("FOOTER").text - sha1_body = file.find("BODY").text - return file_addr, sha1_body, sha1_enc_footer, sha1_footer diff --git a/tcllib/tclencheader.py b/tcllib/tclencheader.py deleted file mode 100644 index 33dde10..0000000 --- a/tcllib/tclencheader.py +++ /dev/null @@ -1,27 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -# pylint: disable=C0111,C0326,C0103 - -"""Tools to interface with TCL's encrypted header API.""" - -from . import credentials - - -class TclEncHeaderMixin: - """A mixin component for TCL's encrypted header API..""" - - def do_encrypt_header(self, encslave, address): - """Perform encrypted header request with given parameters.""" - params = credentials.get_creds2() - params[b"address"] = bytes(address, "utf-8") - url = "http://" + encslave + "/encrypt_header.php" - req = self.sess.post(url, data=params, verify=False) - # Expect "HTTP 206 Partial Content" response - if req.status_code == 206: - return req.content - else: - print("ENCRYPT: " + repr(req)) - print(repr(req.headers)) - print(repr(req.text)) - raise SystemExit diff --git a/tcllib/tclrequest.py b/tcllib/tclrequest.py deleted file mode 100644 index 465ebd2..0000000 --- a/tcllib/tclrequest.py +++ /dev/null @@ -1,123 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -# pylint: disable=C0111,C0326,C0103 - -"""Tools to interface with TCL's download request API.""" - -import binascii -import hashlib -import random -import time -import zlib -from collections import OrderedDict -from math import floor - -from defusedxml import ElementTree - - -''' - private HashMap buildDownloadUrisParams(UpdatePackageInfo updatePackageInfo) { - FotaLog.m28v(TAG, "doAfterCheck"); - String salt = FotaUtil.salt(); - HashMap linkedHashMap = new LinkedHashMap(); - linkedHashMap.put("id", this.internalBuilder.getParam("id")); - linkedHashMap.put("salt", salt); - linkedHashMap.put("curef", updatePackageInfo.mCuref); - linkedHashMap.put("fv", updatePackageInfo.mFv); - linkedHashMap.put("tv", updatePackageInfo.mTv); - linkedHashMap.put("type", "Firmware"); - linkedHashMap.put("fw_id", updatePackageInfo.mFirmwareId); - linkedHashMap.put("mode", "2"); - linkedHashMap.put("vk", generateVk2((LinkedHashMap) linkedHashMap.clone())); - linkedHashMap.put("cltp", "10"); - linkedHashMap.put("cktp", this.internalBuilder.getParam("cktp")); - linkedHashMap.put("rtd", this.internalBuilder.getParam("rtd")); - linkedHashMap.put("chnl", this.internalBuilder.getParam("chnl")); - return linkedHashMap; - } -''' - -VDKEY_B64Z = b"eJwdjwEOwDAIAr8kKFr//7HhmqXp8AIIDrYAgg8byiUXrwRJRXja+d6iNxu0AhUooDCN9rd6rDLxmGIakUVWo3IGCTRWqCAt6X4jGEIUAxgN0eYWnp+LkpHQAg/PsO90ELsy0Npm/n2HbtPndFgGEV31R9OmT4O4nrddjc3Qt6nWscx7e+WRHq5UnOudtjw5skuV09pFhvmqnOEIs4ljPeel1wfLYUF4\n" - - -def get_salt(): - """Generate cryptographic salt.""" - millis = floor(time.time() * 1000) - tail = "{:06d}".format(random.randint(0, 999999)) - return "{}{}".format(str(millis), tail) - - -def get_vk2(params_dict, cltp): - """Generate salted hash of API parameters.""" - params_dict["cltp"] = cltp - query = "" - for key, val in params_dict.items(): - if query: - query += "&" - query += key + "=" + str(val) - vdk = zlib.decompress(binascii.a2b_base64(VDKEY_B64Z)) - query += vdk.decode("utf-8") - engine = hashlib.sha1() - engine.update(bytes(query, "utf-8")) - hexhash = engine.hexdigest() - return hexhash - - -class TclRequestMixin: - """A mixin component for TCL's download request API.""" - - def prep_request(self, curef, fvver, tvver, fw_id): - """Prepare URL and device parameters for download request.""" - url = "https://" + self.g2master + "/download_request.php" - params = OrderedDict() - params["id"] = self.serid - params["salt"] = get_salt() - params["curef"] = curef - params["fv"] = fvver - params["tv"] = tvver - params["type"] = self.ftype - params["fw_id"] = fw_id - params["mode"] = self.mode.value - params["vk"] = get_vk2(params, self.cltp.value) - params["cltp"] = self.cltp.value - params["cktp"] = self.cktp.value - params["rtd"] = self.rtd.value - if self.mode == self.MODE.FULL: - params["foot"] = 1 - params["chnl"] = self.chnl.value - return url, params - - def do_request(self, curef, fvver, tvver, fw_id): - """Perform download request with given parameters.""" - url, params = self.prep_request(curef, fvver, tvver, fw_id) - # print(repr(dict(params))) - req = self.sess.post(url, data=params) - if req.status_code == 200: - req.encoding = "utf-8" # Force encoding as server doesn't give one - self.write_dump(req.text) - return req.text - else: - print("REQUEST: " + repr(req)) - print(repr(req.headers)) - print(repr(req.text)) - raise SystemExit - - @staticmethod - def parse_request(xmlstr): - """Parse output of ``do_request``.""" - root = ElementTree.fromstring(xmlstr) - file = root.find("FILE_LIST").find("FILE") - fileid = file.find("FILE_ID").text - fileurl = file.find("DOWNLOAD_URL").text - s3_fileurl_node = file.find("S3_DOWNLOAD_URL") - s3_fileurl = "" - if s3_fileurl_node: - s3_fileurl = s3_fileurl_node.text - slave_list = root.find("SLAVE_LIST").findall("SLAVE") - enc_list = root.find("SLAVE_LIST").findall("ENCRYPT_SLAVE") - s3_slave_list = root.find("SLAVE_LIST").findall("S3_SLAVE") - slaves = [s.text for s in slave_list] - encslaves = [s.text for s in enc_list] - s3_slaves = [s.text for s in s3_slave_list] - return fileid, fileurl, slaves, encslaves, s3_fileurl, s3_slaves diff --git a/tcllib/xmltools.py b/tcllib/xmltools.py deleted file mode 100644 index 5933a19..0000000 --- a/tcllib/xmltools.py +++ /dev/null @@ -1,14 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -# pylint: disable=C0111,C0326,C0103 - -"""XML tools.""" - -import xml.dom.minidom - - -def pretty_xml(xmlstr): - """Prettify input XML with ``xml.dom.minidom``.""" - mdx = xml.dom.minidom.parseString(xmlstr) - return mdx.toprettyxml(indent=" ")