diff --git a/tclcheck.py b/tclcheck.py index 5405049..a1b4a52 100755 --- a/tclcheck.py +++ b/tclcheck.py @@ -6,14 +6,14 @@ """Checks for the latest FULL or OTA updates for specified PRD number.""" import os -import random import sys from tcllib import argparser from tcllib.devices import Device from tcllib.dumpmgr import write_info_if_dumps_found -from tcllib.requests import RequestRunner, CheckRequest, DownloadRequest, \ - ChecksumRequest, EncryptHeaderRequest, ServerSelector +from tcllib.requests import (CheckRequest, ChecksumRequest, DownloadRequest, + EncryptHeaderRequest, RequestRunner, + ServerSelector) dpdesc = """ @@ -33,6 +33,7 @@ args = dp.parse_args(sys.argv[1:]) dev = Device(args.prd[0], args.fvver) dev.imei = "3531510" + def sel_mode(txtmode, autoval, rawval): """Handle custom mode.""" if rawval: @@ -43,6 +44,7 @@ def sel_mode(txtmode, autoval, rawval): return dev.MODE_STATES["OTA"] return dev.MODE_STATES["FULL"] + def sel_cltp(txtmode, autoval, rawval): """Handle custom CLTP.""" if rawval: @@ -53,6 +55,7 @@ def sel_cltp(txtmode, autoval, rawval): return dev.CLTP_STATES["DESKTOP"] return dev.CLTP_STATES["MOBILE"] + if args.imei: print("Use specified IMEI: {}".format(args.imei)) dev.imei = args.imei diff --git a/tclcheck_allfull.py b/tclcheck_allfull.py index 1b76ae0..41ff399 100644 --- a/tclcheck_allfull.py +++ b/tclcheck_allfull.py @@ -10,7 +10,7 @@ import sys from tcllib import ansi, argparser, devlist from tcllib.devices import DesktopDevice from tcllib.dumpmgr import write_info_if_dumps_found -from tcllib.requests import RequestRunner, CheckRequest, ServerVoteSelector +from tcllib.requests import CheckRequest, RequestRunner, ServerVoteSelector dev = DesktopDevice() diff --git a/tclcheck_allota.py b/tclcheck_allota.py index 044726a..26a1b47 100644 --- a/tclcheck_allota.py +++ b/tclcheck_allota.py @@ -10,7 +10,7 @@ import sys from tcllib import ansi, argparser, devlist from tcllib.devices import MobileDevice from tcllib.dumpmgr import write_info_if_dumps_found -from tcllib.requests import RequestRunner, CheckRequest, ServerVoteSelector +from tcllib.requests import CheckRequest, RequestRunner, ServerVoteSelector dev = MobileDevice() diff --git a/tclcheck_findprd.py b/tclcheck_findprd.py index 3d8a168..d4ee95b 100755 --- a/tclcheck_findprd.py +++ b/tclcheck_findprd.py @@ -11,7 +11,7 @@ import sys from tcllib import ansi, argparser, devlist from tcllib.devices import DesktopDevice from tcllib.dumpmgr import write_info_if_dumps_found -from tcllib.requests import RequestRunner, CheckRequest, ServerVoteSelector +from tcllib.requests import CheckRequest, RequestRunner, ServerVoteSelector dpdesc = """ diff --git a/tclcheck_findprd2.py b/tclcheck_findprd2.py index a166ef9..2fa05cc 100644 --- a/tclcheck_findprd2.py +++ b/tclcheck_findprd2.py @@ -10,7 +10,7 @@ import sys from tcllib import ansi, argparser, devlist from tcllib.devices import DesktopDevice from tcllib.dumpmgr import write_info_if_dumps_found -from tcllib.requests import RequestRunner, CheckRequest, ServerVoteSelector +from tcllib.requests import CheckRequest, RequestRunner, ServerVoteSelector # Variants to scan for diff --git a/tclcheck_findver.py b/tclcheck_findver.py index 808e9ca..9f06b6f 100755 --- a/tclcheck_findver.py +++ b/tclcheck_findver.py @@ -7,10 +7,10 @@ import sys -from tcllib import ansi, argparser, devlist +from tcllib import ansi, argparser from tcllib.devices import MobileDevice from tcllib.dumpmgr import write_info_if_dumps_found -from tcllib.requests import RequestRunner, CheckRequest, ServerVoteSelector +from tcllib.requests import CheckRequest, RequestRunner, ServerVoteSelector dpdesc = """ diff --git a/tclcheck_gapfill.py b/tclcheck_gapfill.py index 9e11604..9972c02 100644 --- a/tclcheck_gapfill.py +++ b/tclcheck_gapfill.py @@ -6,12 +6,12 @@ """Query existence of missing OTAs.""" import json + import requests -from tcllib import ansi from tcllib.devices import MobileDevice from tcllib.dumpmgr import write_info_if_dumps_found -from tcllib.requests import RequestRunner, CheckRequest, ServerVoteSelector +from tcllib.requests import CheckRequest, RequestRunner, ServerVoteSelector # 1. Fetch list of missing OTAs (e.g. from ancient versions to current) diff --git a/tclchksum.py b/tclchksum.py index 18a3d0c..755fad6 100644 --- a/tclchksum.py +++ b/tclchksum.py @@ -8,7 +8,7 @@ import sys from tcllib import argparser -from tcllib.requests import RequestRunner, ChecksumRequest, ServerSelector +from tcllib.requests import ChecksumRequest, RequestRunner, ServerSelector encslaves = [ "54.238.56.196", diff --git a/tcldown.py b/tcldown.py index 73fb3dd..12e7d30 100644 --- a/tcldown.py +++ b/tcldown.py @@ -11,8 +11,8 @@ import sys from tcllib import argparser from tcllib.devices import DesktopDevice from tcllib.dumpmgr import write_info_if_dumps_found -from tcllib.requests import RequestRunner, CheckRequest, DownloadRequest, \ - ChecksumRequest, EncryptHeaderRequest, ServerSelector +from tcllib.requests import (DownloadRequest, EncryptHeaderRequest, + RequestRunner, ServerSelector) dpdesc = """ @@ -31,6 +31,7 @@ args = dp.parse_args(sys.argv[1:]) dev = DesktopDevice() + def sel_mode(defaultmode, rawval): """Handle custom mode.""" if rawval: diff --git a/tcllib/__init__.py b/tcllib/__init__.py index 9d6eeee..b940875 100644 --- a/tcllib/__init__.py +++ b/tcllib/__init__.py @@ -1,3 +1,4 @@ +#!/usr/bin/env python3 # -*- coding: utf-8 -*- """Library for TCL API work and related functions.""" diff --git a/tcllib/devices.py b/tcllib/devices.py index bcd8b97..f55444b 100644 --- a/tcllib/devices.py +++ b/tcllib/devices.py @@ -1,3 +1,4 @@ +#!/usr/bin/env python3 # -*- coding: utf-8 -*- """Pseudo-devices for desktop/mobile requests""" @@ -27,7 +28,7 @@ class Device(): self.chnl = self.CHNL_STATES["WIFI"] self.cktp = self.CKTP_STATES["MANUAL"] self.ckot = self.CKOT_STATES["ALL"] - self.ua = "tcl" + self.uagent = "tcl" def is_rooted(self): """Get RTD as boolean.""" @@ -71,7 +72,7 @@ class MobileDevice(Device): self.imei = "3531510" self.set_cltp("MOBILE") self.set_mode("OTA") - self.ua = "com.tcl.fota/5.1.0.2.0029.0, Android" + self.uagent = "com.tcl.fota/5.1.0.2.0029.0, Android" class DesktopDevice(Device): diff --git a/tcllib/dumpmgr.py b/tcllib/dumpmgr.py index e6bf993..182ede8 100644 --- a/tcllib/dumpmgr.py +++ b/tcllib/dumpmgr.py @@ -21,6 +21,7 @@ def get_timestamp_random(): tail = "{:06d}".format(random.randint(0, 999999)) return "{}_{}".format(str(millis), tail) + def write_info_if_dumps_found(): """Notify user to upload dumps if present.""" # To disable this info, uncomment the following line. @@ -31,6 +32,7 @@ def write_info_if_dumps_found(): print("{}There are {} logs collected in the logs/ directory.{} Please consider uploading".format(ansi.YELLOW, len(files), ansi.RESET)) print("them to https://tclota.birth-online.de/ by running {}./upload_logs.py{}.".format(ansi.CYAN, ansi.RESET)) + class DumpMgr: """A class for XML dump management.""" diff --git a/tcllib/requests/__init__.py b/tcllib/requests/__init__.py index 7872868..afe17ce 100644 --- a/tcllib/requests/__init__.py +++ b/tcllib/requests/__init__.py @@ -1,8 +1,11 @@ +#!/usr/bin/env python3 # -*- coding: utf-8 -*- +"""Library for generic TCL API requests.""" + from .checkrequest import CheckRequest -from .downloadrequest import DownloadRequest from .checksumrequest import ChecksumRequest +from .downloadrequest import DownloadRequest from .encryptheaderrequest import EncryptHeaderRequest from .runner import * from .serverselector import * diff --git a/tcllib/requests/checkrequest.py b/tcllib/requests/checkrequest.py index 353b495..db5d88b 100644 --- a/tcllib/requests/checkrequest.py +++ b/tcllib/requests/checkrequest.py @@ -1,21 +1,31 @@ +#!/usr/bin/env python3 # -*- coding: utf-8 -*- +"""Generic update check request.""" + from collections import OrderedDict + from .. import devices from .tclrequest import TclRequest from .tclresult import CheckResult + class CheckRequest(TclRequest): + """Generic update check request.""" + def __init__(self, device: devices.Device): + """Populate variables..""" super().__init__() self.uri = "/check.php" self.method = "GET" self.device = device - + def get_headers(self): - return {"User-Agent": self.device.ua} + """Return request headers.""" + return {"User-Agent": self.device.uagent} def get_params(self): + """Return request parameters.""" params = OrderedDict() params["id"] = self.device.imei params["curef"] = self.device.curef @@ -31,6 +41,7 @@ class CheckRequest(TclRequest): return params def is_done(self, http_status: int, contents: str) -> bool: + """Handle request result.""" ok_states = { 204: "No update available.", 404: "No data for requested CUREF/FV combination.", @@ -47,7 +58,7 @@ class CheckRequest(TclRequest): elif http_status not in [500, 502, 503]: # Errors OTHER than 500, 502 or 503 are probably # errors where we don't need to retry - self.error ="HTTP {}.".format(http_status) + self.error = "HTTP {}.".format(http_status) self.success = False return True return False diff --git a/tcllib/requests/checksumrequest.py b/tcllib/requests/checksumrequest.py index 2ec2f89..e2a8d77 100644 --- a/tcllib/requests/checksumrequest.py +++ b/tcllib/requests/checksumrequest.py @@ -1,13 +1,21 @@ +#!/usr/bin/env python3 # -*- coding: utf-8 -*- -from collections import OrderedDict +"""Generic file checksum request.""" + import json -from .. import credentials, devices +from collections import OrderedDict + +from .. import credentials from .tclrequest import TclRequest from .tclresult import ChecksumResult + class ChecksumRequest(TclRequest): + """Generic file checksum request.""" + def __init__(self, address, file_uri): + """Populate variables.""" super().__init__() # NOTE: THIS HAS TO BE RUN ON AN ENCSLAVE self.uri = "/checksum.php" @@ -16,9 +24,11 @@ class ChecksumRequest(TclRequest): self.file_uri = file_uri def get_headers(self): + """Return request headers.""" return {"User-Agent": "tcl"} def get_params(self): + """Return request parameters.""" params = OrderedDict() params.update(credentials.get_creds2()) payload = {self.address: self.file_uri} @@ -27,6 +37,7 @@ class ChecksumRequest(TclRequest): return params def is_done(self, http_status: int, contents: str) -> bool: + """Handle request result.""" if http_status == 200: # 2abfa6f6507044fec995efede5d818e62a0b19b5 means ERROR (invalid ADDRESS!) if "2abfa6f6507044fec995efede5d818e62a0b19b5" in contents: diff --git a/tcllib/requests/downloadrequest.py b/tcllib/requests/downloadrequest.py index 345c247..ea05b52 100644 --- a/tcllib/requests/downloadrequest.py +++ b/tcllib/requests/downloadrequest.py @@ -1,5 +1,8 @@ +#!/usr/bin/env python3 # -*- coding: utf-8 -*- +"""Generic file download request.""" + import binascii import hashlib import random @@ -7,6 +10,7 @@ import time import zlib from collections import OrderedDict from math import floor + from .. import devices from .tclrequest import TclRequest from .tclresult import DownloadResult @@ -21,6 +25,7 @@ def get_salt(): tail = "{:06d}".format(random.randint(0, 999999)) return "{}{}".format(str(millis), tail) + def get_vk2(params_dict, cltp): """Generate salted hash of API parameters.""" params_dict["cltp"] = cltp @@ -36,8 +41,12 @@ def get_vk2(params_dict, cltp): hexhash = engine.hexdigest() return hexhash + class DownloadRequest(TclRequest): + """Generic file download request.""" + def __init__(self, device: devices.Device, tvver: str, fw_id: str): + """Populate variables.""" super().__init__() self.uri = "/download_request.php" self.method = "POST" @@ -46,9 +55,11 @@ class DownloadRequest(TclRequest): self.fw_id = fw_id def get_headers(self): - return {"User-Agent": self.device.ua} + """Return request headers.""" + return {"User-Agent": self.device.uagent} def get_params(self): + """Return request parameters.""" params = OrderedDict() params["id"] = self.device.imei params["salt"] = get_salt() @@ -68,6 +79,7 @@ class DownloadRequest(TclRequest): return params def is_done(self, http_status: int, contents: str) -> bool: + """Handle request result.""" if http_status == 200: self.response = contents self.result = DownloadResult(contents) diff --git a/tcllib/requests/encryptheaderrequest.py b/tcllib/requests/encryptheaderrequest.py index e78f06a..fbaa352 100644 --- a/tcllib/requests/encryptheaderrequest.py +++ b/tcllib/requests/encryptheaderrequest.py @@ -1,12 +1,20 @@ +#!/usr/bin/env python3 # -*- coding: utf-8 -*- +"""Generic encrypted header download request.""" + from collections import OrderedDict -from .. import credentials, devices + +from .. import credentials from .tclrequest import TclRequest from .tclresult import EncryptHeaderResult + class EncryptHeaderRequest(TclRequest): + """Generic encrypted header download request.""" + def __init__(self, file_uri): + """Populate variables.""" super().__init__() # NOTE: THIS HAS TO BE RUN ON AN ENCSLAVE self.uri = "/encrypt_header.php" @@ -15,15 +23,18 @@ class EncryptHeaderRequest(TclRequest): self.file_uri = file_uri def get_headers(self): + """Return request headers.""" return {"User-Agent": "tcl"} def get_params(self): + """Return request parameters.""" params = OrderedDict() params.update(credentials.get_creds2()) params["address"] = bytes(self.file_uri, "utf-8") return params def is_done(self, http_status: int, contents: str) -> bool: + """Handle request result.""" # Expect "HTTP 206 Partial Content" response if http_status == 206: self.result = EncryptHeaderResult(contents) diff --git a/tcllib/requests/http.py b/tcllib/requests/http.py index 2b2821a..7aaf005 100644 --- a/tcllib/requests/http.py +++ b/tcllib/requests/http.py @@ -1,13 +1,21 @@ +#!/usr/bin/env python3 # -*- coding: utf-8 -*- -import requests +"""Base HTTP requests.""" + from collections import OrderedDict +import requests + + class TimeoutException(Exception): + """Ignore timeouts.""" pass + class HttpRequest: """Provides all generic features for making HTTP GET requests""" + def __init__(self, url, timeout=10): self.url = url self.params = OrderedDict() @@ -23,16 +31,18 @@ class HttpRequest: """Run query.""" try: req = self.sess.get(self.url, params=self.params, timeout=self.timeout) - except requests.exceptions.Timeout as e: - raise TimeoutException(e) + except requests.exceptions.Timeout as exc: + raise TimeoutException(exc) return req + class HttpPostRequest(HttpRequest): """Provides all generic features for making HTTP POST requests""" + def run(self): """Run query.""" try: req = self.sess.post(self.url, data=self.params, timeout=self.timeout) - except requests.exceptions.Timeout as e: - raise TimeoutException(e) + except requests.exceptions.Timeout as exc: + raise TimeoutException(exc) return req diff --git a/tcllib/requests/runner.py b/tcllib/requests/runner.py index 5569f89..cd7aca2 100644 --- a/tcllib/requests/runner.py +++ b/tcllib/requests/runner.py @@ -1,20 +1,28 @@ +#!/usr/bin/env python3 # -*- coding: utf-8 -*- +"""Generic request executors.""" + +from . import http, serverselector from .tclrequest import TclRequest -from . import http -from . import serverselector + class UnknownMethodException(Exception): + """Ignore unknown methods.""" pass + class RequestRunner: + """Generic request executor.""" + def __init__(self, server_selector: serverselector.ServerSelector, https=True): + """Populate variables.""" self.server_selector = server_selector self.protocol = "https://" if https else "http://" self.max_tries = 5 def get_http(self, method="GET") -> http.HttpRequest: - """Returns the http class according to desired method.""" + """Return the http class according to desired method.""" if method == "GET": return http.HttpRequest elif method == "POST": @@ -22,11 +30,11 @@ class RequestRunner: raise UnknownMethodException("Unknown http method: {}".format(method)) def get_server(self) -> str: - """Returns a master server.""" + """Return a master server.""" return self.server_selector.get_master_server() def run(self, query: TclRequest, timeout: int=10) -> bool: - """Runs the actual query.""" + """Run the actual query.""" for _ in range(0, self.max_tries): url = "{}{}{}".format(self.protocol, self.get_server(), query.uri) http_handler = self.get_http(query.method)(url, timeout) diff --git a/tcllib/requests/serverselector.py b/tcllib/requests/serverselector.py index cd2236f..1397512 100644 --- a/tcllib/requests/serverselector.py +++ b/tcllib/requests/serverselector.py @@ -1,12 +1,14 @@ +#!/usr/bin/env python3 # -*- coding: utf-8 -*- # pylint: disable=C0111,C0326,C0103 """Tools to sort API servers to find the least awful one.""" -import numpy import time +import numpy + MASTER_SERVERS = [ "g2master-us-east.tclclouds.com", @@ -17,6 +19,7 @@ MASTER_SERVERS = [ "g2master-sa-east.tclclouds.com", ] + class ServerSelector: """Returns a random server to use.""" @@ -45,6 +48,7 @@ class ServerSelector: """Hook to be called after request finished""" pass + class ServerVoteSelector(ServerSelector): """Tries to return faster servers more often.""" diff --git a/tcllib/requests/tclrequest.py b/tcllib/requests/tclrequest.py index 9d39607..c88492e 100644 --- a/tcllib/requests/tclrequest.py +++ b/tcllib/requests/tclrequest.py @@ -1,9 +1,16 @@ +#!/usr/bin/env python3 # -*- coding: utf-8 -*- +"""Generic TCL request object.""" + from . import tclresult + class TclRequest: + """Generic TCL request object.""" + def __init__(self): + """Populate variables.""" self.uri = "" self.rawmode = False self.response = None @@ -12,15 +19,17 @@ class TclRequest: self.success = False def get_headers(self): + """Return request headers.""" return {} def get_params(self): + """Return request parameters.""" return {} def is_done(self, http_status: int, contents: str): - """Checks if query is done or needs retry.""" + """Check if query is done or needs retry.""" return False def get_result(self) -> tclresult.TclResult: - """Returns Result object.""" + """Return Result object.""" return self.result diff --git a/tcllib/requests/tclresult.py b/tcllib/requests/tclresult.py index 6b526b7..cb02852 100644 --- a/tcllib/requests/tclresult.py +++ b/tcllib/requests/tclresult.py @@ -1,5 +1,8 @@ +#!/usr/bin/env python3 # -*- coding: utf-8 -*- +"""Generic TCL API result handlers.""" + import xml.dom.minidom from defusedxml import ElementTree @@ -8,12 +11,16 @@ from .. import dumpmgr class TclResult: + """Generic TCL API result.""" + def __init__(self, xmlstr: str): + """Populate variables.""" self.raw_xml = xmlstr self.dumper = dumpmgr.DumpMgr() self.dumper.write_dump(xmlstr) def delete_dump(self): + """Delete last dump.""" self.dumper.delete_last_dump() def pretty_xml(self): @@ -21,8 +28,12 @@ class TclResult: mdx = xml.dom.minidom.parseString(self.raw_xml) return mdx.toprettyxml(indent=" ") + class CheckResult(TclResult): + """Handle check request result.""" + def __init__(self, xmlstr: str): + """Extract data from check request result.""" super().__init__(xmlstr) root = ElementTree.fromstring(xmlstr) self.curef = root.find("CUREF").text @@ -35,8 +46,12 @@ class CheckResult(TclResult): self.filesize = fileinfo.find("SIZE").text self.filehash = fileinfo.find("CHECKSUM").text + class DownloadResult(TclResult): + """Handle download request result.""" + def __init__(self, xmlstr: str): + """Extract data from download request result.""" super().__init__(xmlstr) root = ElementTree.fromstring(xmlstr) file = root.find("FILE_LIST").find("FILE") @@ -53,8 +68,12 @@ class DownloadResult(TclResult): self.encslaves = [s.text for s in enc_list] self.s3_slaves = [s.text for s in s3_slave_list] + class ChecksumResult(TclResult): + """Handle checksum request result.""" + def __init__(self, xmlstr: str): + """Extract data from checksum request result.""" super().__init__(xmlstr) root = ElementTree.fromstring(xmlstr) file = root.find("FILE_CHECKSUM_LIST").find("FILE") @@ -63,6 +82,10 @@ class ChecksumResult(TclResult): self.sha1_footer = file.find("FOOTER").text self.sha1_body = file.find("BODY").text + class EncryptHeaderResult(TclResult): + """Handle encrypted header request result.""" + def __init__(self, contents: str): + """Extract data from encrypted header request result.""" self.rawdata = contents