1
0
mirror of https://github.com/mbirth/tcl_ota_check.git synced 2024-11-09 22:06:47 +00:00

More cleanup.

This commit is contained in:
Markus Birth 2018-02-11 01:51:24 +01:00
parent e582642936
commit bab58107fe
Signed by: mbirth
GPG Key ID: A9928D7A098C3A9A
7 changed files with 3 additions and 381 deletions

View File

@ -4,4 +4,6 @@
from .ansi import * from .ansi import *
from .argparser import * from .argparser import *
from .dumpmgr import write_info_if_dumps_found from .devices import *
from .devlist import *
from .dumpmgr import *

View File

@ -1,66 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0326,C0103
"""Tools to sort API servers to find the least awful one."""
import numpy
class ServerVoteMixin:
"""A mixin component for server sorting."""
def __init__(self):
"""Populate server list and weighting variables."""
self.g2master = None
self.master_servers = [
"g2master-us-east.tclclouds.com",
"g2master-us-west.tclclouds.com",
"g2master-eu-west.tclclouds.com",
"g2master-ap-south.tclclouds.com",
"g2master-ap-north.tclclouds.com",
"g2master-sa-east.tclclouds.com",
]
self.master_servers_weights = [3] * len(self.master_servers)
self.check_time_sum = 3
self.check_time_count = 1
def get_master_server(self):
"""Return weighted choice from server list."""
weight_sum = 0
for i in self.master_servers_weights:
weight_sum += i
numpy_weights = []
for i in self.master_servers_weights:
numpy_weights.append(i/weight_sum)
return numpy.random.choice(self.master_servers, p=numpy_weights)
def master_server_downvote(self):
"""Decrease weight of a server."""
idx = self.master_servers.index(self.g2master)
if self.master_servers_weights[idx] > 1:
self.master_servers_weights[idx] -= 1
def master_server_upvote(self):
"""Increase weight of a server."""
idx = self.master_servers.index(self.g2master)
if self.master_servers_weights[idx] < 10:
self.master_servers_weights[idx] += 1
def check_time_add(self, duration):
"""Record connection time."""
self.check_time_sum += duration
self.check_time_count += 1
def check_time_avg(self):
"""Return average connection time."""
return self.check_time_sum / self.check_time_count
def master_server_vote_on_time(self, last_duration):
"""Change weight of a server based on average connection time."""
avg_duration = self.check_time_avg()
if last_duration < avg_duration - 0.5:
self.master_server_upvote()
elif last_duration > avg_duration + 0.5:
self.master_server_downvote()

View File

@ -1,94 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0326,C0103
"""Tools to interface with TCL's update request API."""
import time
from collections import OrderedDict, defaultdict
import requests
from defusedxml import ElementTree
from .devices import Device
class TclCheckMixin:
"""A mixin component for TCL's update request API."""
def prep_check_url(self, https=True):
"""Prepare URL for update request."""
protocol = "https://" if https else "http://"
url = protocol + self.g2master + "/check.php"
return url
def prep_check(self, device: Device, https=True):
"""Prepare URL and parameters for update request."""
url = self.prep_check_url(https)
params = OrderedDict()
params["id"] = device.imei
params["curef"] = device.curef
params["fv"] = device.fwver
params["mode"] = device.mode
params["type"] = device.type
params["cltp"] = device.cltp
params["cktp"] = device.cktp
params["rtd"] = device.rtd
params["chnl"] = device.chnl
#params["osvs"] = device.osvs
#params["ckot"] = device.ckot
return url, params
def do_check(self, device: Device, https=True, timeout=10, max_tries=5):
"""Perform update request with given parameters."""
url, params = self.prep_check(device, https)
last_response = None
for _ in range(0, max_tries):
try:
reqtime_start = time.perf_counter()
req = self.sess.get(url, params=params, timeout=timeout)
reqtime = time.perf_counter() - reqtime_start
self.check_time_add(reqtime)
last_response = req
if req.status_code == 200:
self.master_server_vote_on_time(reqtime)
req.encoding = "utf-8" # Force encoding as server doesn't give one
self.write_dump(req.text)
return req.text
elif req.status_code not in [500, 502, 503]:
self.do_check_errorhandle(req, reqtime)
except requests.exceptions.Timeout:
pass
# Something went wrong, try a different server
self.master_server_downvote()
self.g2master = self.get_master_server()
url = self.prep_check_url(https)
raise requests.exceptions.RetryError("Max tries ({}) reached.".format(max_tries), response=last_response)
def do_check_errorhandle(self, req, reqtime):
"""Handle non-HTTP 200 results for ``do_check``."""
errcodes = defaultdict(lambda: "HTTP {}.".format(req.status_code))
errcodes[204] = "No update available."
errcodes[404] = "No data for requested CUREF/FV combination."
if req.status_code in [204, 404]:
self.master_server_vote_on_time(reqtime)
elif req.status_code not in [500, 502, 503]:
self.master_server_downvote()
req.raise_for_status()
raise requests.exceptions.HTTPError(errcodes[req.status_code], response=req)
@staticmethod
def parse_check(xmlstr):
"""Parse output of ``do_check``."""
root = ElementTree.fromstring(xmlstr)
curef = root.find("CUREF").text
fvver = root.find("VERSION").find("FV").text
tvver = root.find("VERSION").find("TV").text
fw_id = root.find("FIRMWARE").find("FW_ID").text
fileinfo = root.find("FIRMWARE").find("FILESET").find("FILE")
fileid = fileinfo.find("FILE_ID").text
filename = fileinfo.find("FILENAME").text
filesize = fileinfo.find("SIZE").text
filehash = fileinfo.find("CHECKSUM").text
return curef, fvver, tvver, fw_id, fileid, filename, filesize, filehash

View File

@ -1,56 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0326,C0103
"""Tools to interface with TCL's checksum API."""
import json
from defusedxml import ElementTree
from . import credentials
class TclChecksumMixin:
"""A mixin component for TCL's checksum API."""
@staticmethod
def prep_checksum(encslave, address, uri):
"""Prepare URL and parameters for checksum request."""
url = "http://" + encslave + "/checksum.php"
params = credentials.get_creds2()
payload = {address: uri}
payload_json = json.dumps(payload)
params[b"address"] = bytes(payload_json, "utf-8")
return url, params
def do_checksum(self, encslave, address, uri):
"""Perform checksum request with given parameters."""
url, params = self.prep_checksum(encslave, address, uri)
# print(repr(dict(params)))
req = self.sess.post(url, data=params)
if req.status_code == 200:
req.encoding = "utf-8" # Force encoding as server doesn't give one
self.write_dump(req.text)
# <ENCRYPT_FOOTER>2abfa6f6507044fec995efede5d818e62a0b19b5</ENCRYPT_FOOTER> means ERROR (invalid ADDRESS!)
if "<ENCRYPT_FOOTER>2abfa6f6507044fec995efede5d818e62a0b19b5</ENCRYPT_FOOTER>" in req.text:
print("INVALID URI: {}".format(uri))
raise SystemExit
return req.text
else:
print("CHECKSUM: " + repr(req))
print(repr(req.headers))
print(repr(req.text))
raise SystemExit
@staticmethod
def parse_checksum(xmlstr):
"""Parse output of ``do_checksum``."""
root = ElementTree.fromstring(xmlstr)
file = root.find("FILE_CHECKSUM_LIST").find("FILE")
file_addr = file.find("ADDRESS").text
sha1_enc_footer = file.find("ENCRYPT_FOOTER").text
sha1_footer = file.find("FOOTER").text
sha1_body = file.find("BODY").text
return file_addr, sha1_body, sha1_enc_footer, sha1_footer

View File

@ -1,27 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0326,C0103
"""Tools to interface with TCL's encrypted header API."""
from . import credentials
class TclEncHeaderMixin:
"""A mixin component for TCL's encrypted header API.."""
def do_encrypt_header(self, encslave, address):
"""Perform encrypted header request with given parameters."""
params = credentials.get_creds2()
params[b"address"] = bytes(address, "utf-8")
url = "http://" + encslave + "/encrypt_header.php"
req = self.sess.post(url, data=params, verify=False)
# Expect "HTTP 206 Partial Content" response
if req.status_code == 206:
return req.content
else:
print("ENCRYPT: " + repr(req))
print(repr(req.headers))
print(repr(req.text))
raise SystemExit

View File

@ -1,123 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0326,C0103
"""Tools to interface with TCL's download request API."""
import binascii
import hashlib
import random
import time
import zlib
from collections import OrderedDict
from math import floor
from defusedxml import ElementTree
'''
private HashMap<String, String> buildDownloadUrisParams(UpdatePackageInfo updatePackageInfo) {
FotaLog.m28v(TAG, "doAfterCheck");
String salt = FotaUtil.salt();
HashMap linkedHashMap = new LinkedHashMap();
linkedHashMap.put("id", this.internalBuilder.getParam("id"));
linkedHashMap.put("salt", salt);
linkedHashMap.put("curef", updatePackageInfo.mCuref);
linkedHashMap.put("fv", updatePackageInfo.mFv);
linkedHashMap.put("tv", updatePackageInfo.mTv);
linkedHashMap.put("type", "Firmware");
linkedHashMap.put("fw_id", updatePackageInfo.mFirmwareId);
linkedHashMap.put("mode", "2");
linkedHashMap.put("vk", generateVk2((LinkedHashMap) linkedHashMap.clone()));
linkedHashMap.put("cltp", "10");
linkedHashMap.put("cktp", this.internalBuilder.getParam("cktp"));
linkedHashMap.put("rtd", this.internalBuilder.getParam("rtd"));
linkedHashMap.put("chnl", this.internalBuilder.getParam("chnl"));
return linkedHashMap;
}
'''
VDKEY_B64Z = b"eJwdjwEOwDAIAr8kKFr//7HhmqXp8AIIDrYAgg8byiUXrwRJRXja+d6iNxu0AhUooDCN9rd6rDLxmGIakUVWo3IGCTRWqCAt6X4jGEIUAxgN0eYWnp+LkpHQAg/PsO90ELsy0Npm/n2HbtPndFgGEV31R9OmT4O4nrddjc3Qt6nWscx7e+WRHq5UnOudtjw5skuV09pFhvmqnOEIs4ljPeel1wfLYUF4\n"
def get_salt():
"""Generate cryptographic salt."""
millis = floor(time.time() * 1000)
tail = "{:06d}".format(random.randint(0, 999999))
return "{}{}".format(str(millis), tail)
def get_vk2(params_dict, cltp):
"""Generate salted hash of API parameters."""
params_dict["cltp"] = cltp
query = ""
for key, val in params_dict.items():
if query:
query += "&"
query += key + "=" + str(val)
vdk = zlib.decompress(binascii.a2b_base64(VDKEY_B64Z))
query += vdk.decode("utf-8")
engine = hashlib.sha1()
engine.update(bytes(query, "utf-8"))
hexhash = engine.hexdigest()
return hexhash
class TclRequestMixin:
"""A mixin component for TCL's download request API."""
def prep_request(self, curef, fvver, tvver, fw_id):
"""Prepare URL and device parameters for download request."""
url = "https://" + self.g2master + "/download_request.php"
params = OrderedDict()
params["id"] = self.serid
params["salt"] = get_salt()
params["curef"] = curef
params["fv"] = fvver
params["tv"] = tvver
params["type"] = self.ftype
params["fw_id"] = fw_id
params["mode"] = self.mode.value
params["vk"] = get_vk2(params, self.cltp.value)
params["cltp"] = self.cltp.value
params["cktp"] = self.cktp.value
params["rtd"] = self.rtd.value
if self.mode == self.MODE.FULL:
params["foot"] = 1
params["chnl"] = self.chnl.value
return url, params
def do_request(self, curef, fvver, tvver, fw_id):
"""Perform download request with given parameters."""
url, params = self.prep_request(curef, fvver, tvver, fw_id)
# print(repr(dict(params)))
req = self.sess.post(url, data=params)
if req.status_code == 200:
req.encoding = "utf-8" # Force encoding as server doesn't give one
self.write_dump(req.text)
return req.text
else:
print("REQUEST: " + repr(req))
print(repr(req.headers))
print(repr(req.text))
raise SystemExit
@staticmethod
def parse_request(xmlstr):
"""Parse output of ``do_request``."""
root = ElementTree.fromstring(xmlstr)
file = root.find("FILE_LIST").find("FILE")
fileid = file.find("FILE_ID").text
fileurl = file.find("DOWNLOAD_URL").text
s3_fileurl_node = file.find("S3_DOWNLOAD_URL")
s3_fileurl = ""
if s3_fileurl_node:
s3_fileurl = s3_fileurl_node.text
slave_list = root.find("SLAVE_LIST").findall("SLAVE")
enc_list = root.find("SLAVE_LIST").findall("ENCRYPT_SLAVE")
s3_slave_list = root.find("SLAVE_LIST").findall("S3_SLAVE")
slaves = [s.text for s in slave_list]
encslaves = [s.text for s in enc_list]
s3_slaves = [s.text for s in s3_slave_list]
return fileid, fileurl, slaves, encslaves, s3_fileurl, s3_slaves

View File

@ -1,14 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0326,C0103
"""XML tools."""
import xml.dom.minidom
def pretty_xml(xmlstr):
"""Prettify input XML with ``xml.dom.minidom``."""
mdx = xml.dom.minidom.parseString(xmlstr)
return mdx.toprettyxml(indent=" ")