1
0
mirror of https://github.com/mbirth/tcl_ota_check.git synced 2024-09-19 22:33:25 +01:00

do_check error handler refactor + pylint stuff

This commit is contained in:
thurask 2018-02-06 13:27:36 -05:00
parent 86e01c1e33
commit dbaced1fe8
No known key found for this signature in database
GPG Key ID: A6CCCDEA29795048
10 changed files with 51 additions and 18 deletions

View File

@ -11,6 +11,7 @@ import webbrowser
class DefaultParser(argparse.ArgumentParser): class DefaultParser(argparse.ArgumentParser):
"""argparse parser with some defaults set.""" """argparse parser with some defaults set."""
def __init__(self, appname, desc=None): def __init__(self, appname, desc=None):
"""Set default name, description, epilogue, arguments.""" """Set default name, description, epilogue, arguments."""
homeurl = "https://github.com/mbirth/tcl_ota_check" homeurl = "https://github.com/mbirth/tcl_ota_check"

View File

@ -17,6 +17,7 @@ def get_creds():
params = {base64.b64decode(key): base64.b64decode(val) for key, val in creds.items()} params = {base64.b64decode(key): base64.b64decode(val) for key, val in creds.items()}
return params return params
def get_creds2(): def get_creds2():
"""Return alternate authentication.""" """Return alternate authentication."""
creds = { creds = {

View File

@ -2,6 +2,7 @@
"""Pseudo-devices for desktop/mobile requests""" """Pseudo-devices for desktop/mobile requests"""
class Device(): class Device():
"""Generic pseudo-device class.""" """Generic pseudo-device class."""
CLTP_STATES = { CLTP_STATES = {
@ -60,18 +61,24 @@ class Device():
# Throws exception when invalid mode given: # Throws exception when invalid mode given:
self.ckot = self.CKOT_STATES[new_ckot] self.ckot = self.CKOT_STATES[new_ckot]
class MobileDevice(Device): class MobileDevice(Device):
"""Generic mobile (i.e. OTA) device.""" """Generic mobile (i.e. OTA) device."""
def __init__(self, curef="PRD-63117-011", fwver="AAO472"): def __init__(self, curef="PRD-63117-011", fwver="AAO472"):
"""Populate variables."""
super().__init__(curef, fwver) super().__init__(curef, fwver)
self.imei = "3531510" self.imei = "3531510"
self.set_cltp("MOBILE") self.set_cltp("MOBILE")
self.set_mode("OTA") self.set_mode("OTA")
self.ua = "com.tcl.fota/5.1.0.2.0029.0, Android" self.ua = "com.tcl.fota/5.1.0.2.0029.0, Android"
class DesktopDevice(Device): class DesktopDevice(Device):
"""Generic desktop (i.e. full) device.""" """Generic desktop (i.e. full) device."""
def __init__(self, curef="PRD-63117-011", fwver="AAA000"): def __init__(self, curef="PRD-63117-011", fwver="AAA000"):
"""Populate variables."""
super().__init__(curef, fwver) super().__init__(curef, fwver)
self.imei = "543212345000000" self.imei = "543212345000000"
self.set_cltp("DESKTOP") self.set_cltp("DESKTOP")

View File

@ -32,6 +32,7 @@ def load_local_devicelist():
except FileNotFoundError: except FileNotFoundError:
return None, True return None, True
def get_devicelist(force=False, output_diff=True): def get_devicelist(force=False, output_diff=True):
"""Return device list from saved database.""" """Return device list from saved database."""
old_prds, need_download = load_local_devicelist() old_prds, need_download = load_local_devicelist()
@ -49,7 +50,9 @@ def get_devicelist(force=False, output_diff=True):
return prds return prds
def print_versions_diff(old_data, new_data): def print_versions_diff(old_data, new_data):
"""Print version changes between old and new databases."""
prd = new_data["curef"] prd = new_data["curef"]
if new_data["last_full"] != old_data["last_full"] and new_data["last_ota"] != old_data["last_ota"]: if new_data["last_full"] != old_data["last_full"] and new_data["last_ota"] != old_data["last_ota"]:
print("> {}: {}{} (OTA: {}{})".format( print("> {}: {}{} (OTA: {}{})".format(
@ -64,8 +67,9 @@ def print_versions_diff(old_data, new_data):
elif new_data["last_ota"] != old_data["last_ota"]: elif new_data["last_ota"] != old_data["last_ota"]:
print("> {}: {}{} (OTA)".format(prd, ansi.YELLOW_DARK + str(old_data["last_ota"]) + ansi.RESET, ansi.YELLOW + str(new_data["last_ota"]) + ansi.RESET)) print("> {}: {}{} (OTA)".format(prd, ansi.YELLOW_DARK + str(old_data["last_ota"]) + ansi.RESET, ansi.YELLOW + str(new_data["last_ota"]) + ansi.RESET))
def print_prd_diff(old_prds, new_prds): def print_prd_diff(old_prds, new_prds):
"""Print changes between old and new databases.""" """Print PRD changes between old and new databases."""
added_prds = [prd for prd in new_prds if prd not in old_prds] added_prds = [prd for prd in new_prds if prd not in old_prds]
removed_prds = [prd for prd in old_prds if prd not in new_prds] removed_prds = [prd for prd in old_prds if prd not in new_prds]
for prd in removed_prds: for prd in removed_prds:

View File

@ -17,6 +17,7 @@ from . import ansi
class DumpMgrMixin: class DumpMgrMixin:
"""A mixin component for XML dump management.""" """A mixin component for XML dump management."""
def __init__(self): def __init__(self):
"""Populate dump file name.""" """Populate dump file name."""
self.last_dump_filename = None self.last_dump_filename = None

View File

@ -10,6 +10,7 @@ import numpy
class ServerVoteMixin: class ServerVoteMixin:
"""A mixin component for server sorting.""" """A mixin component for server sorting."""
def __init__(self): def __init__(self):
"""Populate server list and weighting variables.""" """Populate server list and weighting variables."""
self.g2master = None self.g2master = None

View File

@ -6,7 +6,7 @@
"""Tools to interface with TCL's update request API.""" """Tools to interface with TCL's update request API."""
import time import time
from collections import OrderedDict from collections import OrderedDict, defaultdict
import requests import requests
from defusedxml import ElementTree from defusedxml import ElementTree
@ -14,10 +14,16 @@ from defusedxml import ElementTree
class TclCheckMixin: class TclCheckMixin:
"""A mixin component for TCL's update request API.""" """A mixin component for TCL's update request API."""
def prep_check(self, device=None, https=True):
"""Prepare URL and parameters for update request.""" def prep_check_url(self, https=True):
"""Prepare URL for update request."""
protocol = "https://" if https else "http://" protocol = "https://" if https else "http://"
url = protocol + self.g2master + "/check.php" url = protocol + self.g2master + "/check.php"
return url
def prep_check(self, device=None, https=True):
"""Prepare URL and parameters for update request."""
url = self.prep_check_url(https)
params = OrderedDict() params = OrderedDict()
if device: if device:
# Need to support both ways for now # Need to support both ways for now
@ -60,28 +66,31 @@ class TclCheckMixin:
last_response = req last_response = req
if req.status_code == 200: if req.status_code == 200:
self.master_server_vote_on_time(reqtime, reqtime_avg) self.master_server_vote_on_time(reqtime, reqtime_avg)
req.encoding = "utf-8" # Force encoding as server doesn't give one req.encoding = "utf-8" # Force encoding as server doesn't give one
self.write_dump(req.text) self.write_dump(req.text)
return req.text return req.text
elif req.status_code == 204:
self.master_server_vote_on_time(reqtime, reqtime_avg)
raise requests.exceptions.HTTPError("No update available.", response=req)
elif req.status_code == 404:
self.master_server_vote_on_time(reqtime, reqtime_avg)
raise requests.exceptions.HTTPError("No data for requested CUREF/FV combination.", response=req)
elif req.status_code not in [500, 502, 503]: elif req.status_code not in [500, 502, 503]:
self.master_server_downvote() self.do_check_errorhandle(req, reqtime, reqtime_avg)
req.raise_for_status()
raise requests.exceptions.HTTPError("HTTP {}.".format(req.status_code), response=req)
except requests.exceptions.Timeout: except requests.exceptions.Timeout:
pass pass
# Something went wrong, try a different server # Something went wrong, try a different server
self.master_server_downvote() self.master_server_downvote()
self.g2master = self.get_master_server() self.g2master = self.get_master_server()
protocol = "https://" if https else "http://" url = self.prep_check_url(https)
url = protocol + self.g2master + "/check.php"
raise requests.exceptions.RetryError("Max tries ({}) reached.".format(max_tries), response=last_response) raise requests.exceptions.RetryError("Max tries ({}) reached.".format(max_tries), response=last_response)
def do_check_errorhandle(self, req, reqtime, reqtime_avg):
"""Handle non-HTTP 200 results for ``do_check``."""
errcodes = defaultdict(lambda: "HTTP {}.".format(req.status_code))
errcodes[204] = "No update available."
errcodes[404] = "No data for requested CUREF/FV combination."
if req.status_code in [204, 404]:
self.master_server_vote_on_time(reqtime, reqtime_avg)
elif req.status_code not in [500, 502, 503]:
self.master_server_downvote()
req.raise_for_status()
raise requests.exceptions.HTTPError(errcodes[req.status_code], response=req)
@staticmethod @staticmethod
def parse_check(xmlstr): def parse_check(xmlstr):
"""Parse output of ``do_check``.""" """Parse output of ``do_check``."""

View File

@ -14,6 +14,7 @@ from . import credentials
class TclChecksumMixin: class TclChecksumMixin:
"""A mixin component for TCL's checksum API.""" """A mixin component for TCL's checksum API."""
def do_checksum(self, encslave, address, uri): def do_checksum(self, encslave, address, uri):
"""Perform checksum request with given parameters.""" """Perform checksum request with given parameters."""
url = "http://" + encslave + "/checksum.php" url = "http://" + encslave + "/checksum.php"

View File

@ -10,6 +10,7 @@ from . import credentials
class TclEncHeaderMixin: class TclEncHeaderMixin:
"""A mixin component for TCL's encrypted header API..""" """A mixin component for TCL's encrypted header API.."""
def do_encrypt_header(self, encslave, address): def do_encrypt_header(self, encslave, address):
"""Perform encrypted header request with given parameters.""" """Perform encrypted header request with given parameters."""
params = credentials.get_creds2() params = credentials.get_creds2()

View File

@ -40,12 +40,14 @@ from defusedxml import ElementTree
VDKEY_B64Z = b"eJwdjwEOwDAIAr8kKFr//7HhmqXp8AIIDrYAgg8byiUXrwRJRXja+d6iNxu0AhUooDCN9rd6rDLxmGIakUVWo3IGCTRWqCAt6X4jGEIUAxgN0eYWnp+LkpHQAg/PsO90ELsy0Npm/n2HbtPndFgGEV31R9OmT4O4nrddjc3Qt6nWscx7e+WRHq5UnOudtjw5skuV09pFhvmqnOEIs4ljPeel1wfLYUF4\n" VDKEY_B64Z = b"eJwdjwEOwDAIAr8kKFr//7HhmqXp8AIIDrYAgg8byiUXrwRJRXja+d6iNxu0AhUooDCN9rd6rDLxmGIakUVWo3IGCTRWqCAt6X4jGEIUAxgN0eYWnp+LkpHQAg/PsO90ELsy0Npm/n2HbtPndFgGEV31R9OmT4O4nrddjc3Qt6nWscx7e+WRHq5UnOudtjw5skuV09pFhvmqnOEIs4ljPeel1wfLYUF4\n"
def get_salt(): def get_salt():
"""Generate cryptographic salt.""" """Generate cryptographic salt."""
millis = floor(time.time() * 1000) millis = floor(time.time() * 1000)
tail = "{:06d}".format(random.randint(0, 999999)) tail = "{:06d}".format(random.randint(0, 999999))
return "{}{}".format(str(millis), tail) return "{}{}".format(str(millis), tail)
def get_vk2(params_dict, cltp): def get_vk2(params_dict, cltp):
"""Generate salted hash of API parameters.""" """Generate salted hash of API parameters."""
params_dict["cltp"] = cltp params_dict["cltp"] = cltp
@ -61,11 +63,12 @@ def get_vk2(params_dict, cltp):
hexhash = engine.hexdigest() hexhash = engine.hexdigest()
return hexhash return hexhash
class TclRequestMixin: class TclRequestMixin:
"""A mixin component for TCL's download request API.""" """A mixin component for TCL's download request API."""
def do_request(self, curef, fvver, tvver, fw_id): def prep_request(self, curef, fvver, tvver, fw_id):
"""Perform download request with given parameters.""" """Prepare URL and device parameters for download request."""
url = "https://" + self.g2master + "/download_request.php" url = "https://" + self.g2master + "/download_request.php"
params = OrderedDict() params = OrderedDict()
params["id"] = self.serid params["id"] = self.serid
@ -83,7 +86,11 @@ class TclRequestMixin:
if self.mode == self.MODE.FULL: if self.mode == self.MODE.FULL:
params["foot"] = 1 params["foot"] = 1
params["chnl"] = self.chnl.value params["chnl"] = self.chnl.value
return url, params
def do_request(self, curef, fvver, tvver, fw_id):
"""Perform download request with given parameters."""
url, params = self.prep_request(curef, fvver, tvver, fw_id)
# print(repr(dict(params))) # print(repr(dict(params)))
req = self.sess.post(url, data=params) req = self.sess.post(url, data=params)
if req.status_code == 200: if req.status_code == 200: