1
0
mirror of https://github.com/mbirth/tcl_ota_check.git synced 2024-09-19 22:33:25 +01:00

Initial rewrite.

This commit is contained in:
Markus Birth 2018-02-08 01:15:58 +01:00
parent 28ed2365ab
commit 1297b31b2e
Signed by: mbirth
GPG Key ID: A9928D7A098C3A9A
7 changed files with 329 additions and 15 deletions

View File

@ -7,16 +7,14 @@
import sys
from requests.exceptions import RequestException
import tcllib
import tcllib.argparser
from tcllib import ansi, devlist
from tcllib.devices import MobileDevice
from tcllib.requests import RequestRunner, CheckRequest, ServerVoteSelector
dev = MobileDevice()
fc = tcllib.FotaCheck()
dpdesc = """
Checks for the latest OTA updates for all PRD numbers or only for the PRD specified
@ -39,23 +37,26 @@ prds = devlist.get_devicelist()
print("List of latest OTA firmware{} by PRD:".format(force_ver_text))
runner = RequestRunner(ServerVoteSelector())
runner.max_tries = 20
for prd, variant in prds.items():
model = variant["variant"]
lastver = variant["last_ota"]
lastver = variant["last_full"] if lastver is None else lastver
if args.forcever is not None:
lastver = args.forcever
if prdcheck in prd:
try:
dev.curef = prd
dev.fwver = lastver
fc.reset_session(dev)
check_xml = fc.do_check(dev, max_tries=20)
curef, fv, tv, fw_id, fileid, fn, fsize, fhash = fc.parse_check(check_xml)
versioninfo = ansi.YELLOW_DARK + fv + ansi.RESET + "" + ansi.YELLOW + tv + ansi.RESET + " (FULL: {})".format(variant["last_full"])
print("{}: {} {} ({})".format(prd, versioninfo, fhash, model))
except RequestException as e:
print("{} ({}): {}".format(prd, lastver, str(e)))
continue
if not prdcheck in prd:
continue
dev.curef = prd
dev.fwver = lastver
chk = CheckRequest(dev)
runner.run(chk)
if chk.success:
result = chk.get_result()
versioninfo = ansi.YELLOW_DARK + result.fvver + ansi.RESET + "" + ansi.YELLOW + result.tvver + ansi.RESET + " (FULL: {})".format(variant["last_full"])
print("{}: {} {} ({})".format(prd, versioninfo, result.filehash, model))
else:
print("{} ({}): {}".format(prd, lastver, chk.error))
tcllib.FotaCheck.write_info_if_dumps_found()

View File

@ -0,0 +1,5 @@
# -*- coding: utf-8 -*-
from .tcl import *
from .runner import *
from .serverselector import *

38
tcllib/requests/http.py Normal file
View File

@ -0,0 +1,38 @@
# -*- coding: utf-8 -*-
import requests
from collections import OrderedDict
class TimeoutException(Exception):
pass
class HttpRequest:
"""Provides all generic features for making HTTP GET requests"""
def __init__(self, url, timeout=10):
self.url = url
self.params = OrderedDict()
self.timeout = timeout
self.headers = {}
def reset_session(self):
"""Reset everything to default."""
self.sess = requests.Session()
self.sess.headers.update(self.headers)
def run(self):
"""Run query."""
try:
req = self.sess.get(self.url, params=self.params, timeout=self.timeout)
except requests.exceptions.Timeout as e:
raise TimeoutException(e)
return req
class HttpPostRequest(HttpRequest):
"""Provides all generic features for making HTTP POST requests"""
def run(self):
"""Run query."""
try:
req = self.sess.post(self.url, data=self.params, timeout=self.timeout)
except requests.exceptions.Timeout as e:
raise TimeoutException(e)
return req

48
tcllib/requests/runner.py Normal file
View File

@ -0,0 +1,48 @@
# -*- coding: utf-8 -*-
from . import tcl
from . import http
from . import serverselector
class UnknownMethodException(Exception):
pass
class RequestRunner:
def __init__(self, server_selector: serverselector.ServerSelector, https=True):
self.server_selector = server_selector
self.protocol = "https://" if https else "http://"
self.max_tries = 5
def get_http(self, method="GET") -> http.HttpRequest:
"""Returns the http class according to desired method."""
if method == "GET":
return http.HttpRequest
elif method == "POST":
return http.HttpPostRequest
raise UnknownMethodException("Unknown http method: {}".format(method))
def get_server(self) -> str:
"""Returns a master server."""
return self.server_selector.get_master_server()
def run(self, query: tcl.TclRequest, timeout: int=10) -> bool:
"""Runs the actual query."""
for _ in range(0, self.max_tries):
url = "{}{}{}".format(self.protocol, self.get_server(), query.uri)
http_handler = self.get_http(query.method)(url, timeout)
http_handler.headers = query.get_headers()
http_handler.params = query.get_params()
http_handler.reset_session()
self.server_selector.hook_prerequest()
try:
req = http_handler.run()
req.encoding = "utf-8"
done = query.is_done(req.status_code, req.text)
self.server_selector.hook_postrequest(done)
if done:
return done
except http.TimeoutException:
self.server_selector.hook_postrequest(False)
query.error = "Timeout."
query.error = "Max tries ({}) reached.".format(self.max_tries)
return False

View File

@ -0,0 +1,105 @@
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0326,C0103
"""Tools to sort API servers to find the least awful one."""
import numpy
import time
MASTER_SERVERS = [
"g2master-us-east.tclclouds.com",
"g2master-us-west.tclclouds.com",
"g2master-eu-west.tclclouds.com",
"g2master-ap-south.tclclouds.com",
"g2master-ap-north.tclclouds.com",
"g2master-sa-east.tclclouds.com",
]
class ServerSelector:
"""Returns a random server to use."""
def __init__(self):
"""Init stuff"""
self.last_server = None
def get_master_server(self):
"""Return a random server."""
while True:
new_server = numpy.random.choice(MASTER_SERVERS)
if new_server != self.last_server:
break
self.last_server = new_server
return new_server
def hook_prerequest(self):
"""Hook to be called before doing request"""
pass
def hook_postrequest(self, successful: bool):
"""Hook to be called after request finished"""
pass
class ServerVoteSelector(ServerSelector):
"""Tries to return faster servers more often."""
def __init__(self):
"""Populate server list and weighting variables."""
self.last_server = None
self.master_servers_weights = [3] * len(MASTER_SERVERS)
self.check_time_sum = 3
self.check_time_count = 1
def get_master_server(self):
"""Return weighted choice from server list."""
weight_sum = 0
for i in self.master_servers_weights:
weight_sum += i
numpy_weights = []
for i in self.master_servers_weights:
numpy_weights.append(i/weight_sum)
self.last_server = numpy.random.choice(MASTER_SERVERS, p=numpy_weights)
return self.last_server
def master_server_downvote(self):
"""Decrease weight of last chosen server."""
idx = MASTER_SERVERS.index(self.last_server)
if self.master_servers_weights[idx] > 1:
self.master_servers_weights[idx] -= 1
def master_server_upvote(self):
"""Increase weight of last chosen server."""
idx = MASTER_SERVERS.index(self.last_server)
if self.master_servers_weights[idx] < 10:
self.master_servers_weights[idx] += 1
def check_time_add(self, duration):
"""Record connection time."""
self.check_time_sum += duration
self.check_time_count += 1
def check_time_avg(self):
"""Return average connection time."""
return self.check_time_sum / self.check_time_count
def master_server_vote_on_time(self, last_duration):
"""Change weight of a server based on average connection time."""
avg_duration = self.check_time_avg()
if last_duration < avg_duration - 0.5:
self.master_server_upvote()
elif last_duration > avg_duration + 0.5:
self.master_server_downvote()
def hook_prerequest(self):
"""Hook to be called before doing request"""
self.reqtime_start = time.perf_counter()
def hook_postrequest(self, successful: bool):
"""Hook to be called after request finished"""
reqtime = time.perf_counter() - self.reqtime_start
self.check_time_add(reqtime)
if successful:
self.master_server_vote_on_time(reqtime)
else:
self.master_server_downvote()

96
tcllib/requests/tcl.py Normal file
View File

@ -0,0 +1,96 @@
# -*- coding: utf-8 -*-
from .. import devices
from . import tclresult
from collections import OrderedDict
from defusedxml import ElementTree
class TclRequest:
def __init__(self):
self.uri = ""
self.result = None
self.error = None
self.success = False
def get_headers(self):
return {}
def get_params(self):
return {}
def is_done(self, http_status: int, contents: str):
"""Checks if query is done or needs retry."""
return False
def get_result(self):
"""Returns Result object."""
return None
class CheckRequest(TclRequest):
def __init__(self, device: devices.Device):
super().__init__()
self.uri = "/check.php"
self.method = "GET"
self.device = device
def get_headers(self):
return {"User-Agent": self.device.ua}
def get_params(self):
params = OrderedDict()
params["id"] = self.device.imei
params["curef"] = self.device.curef
params["fv"] = self.device.fwver
params["mode"] = self.device.mode
params["type"] = self.device.type
params["cltp"] = self.device.cltp
params["cktp"] = self.device.cktp
params["rtd"] = self.device.rtd
params["chnl"] = self.device.chnl
#params["osvs"] = self.device.osvs
#params["ckot"] = self.device.ckot
return params
def is_done(self, http_status: int, contents: str) -> bool:
ok_states = {
204: "No update available.",
404: "No data for requested CUREF/FV combination.",
}
if http_status == 200:
self.result = contents
self.success = True
return True
elif http_status in ok_states:
self.error = ok_states[http_status]
self.success = False
return True
elif http_status not in [500, 502, 503]:
# Errors OTHER than 500, 502 or 503 are probably
# errors where we don't need to retry
self.error ="HTTP {}.".format(http_status)
self.success = False
return True
return False
def get_result(self):
if not self.success:
return None
return tclresult.CheckResult(self.result)
# Check requests have 4 possible outcomes:
# 1. HTTP 200 with XML data - our desired info
# 2. HTTP 204 - means: no newer update available
# 3. HTTP 404 - means: invalid device or firmware version
# 4. anything else: server problem (esp. 500, 502, 503)
class DownloadRequest(TclRequest):
def __init__(self, device: devices.Device):
super().__init__()
self.uri = "/download_request.php"
self.method = "POST"
self.device = device
def get_headers(self):
return {"User-Agent": self.device.ua}

View File

@ -0,0 +1,21 @@
# -*- coding: utf-8 -*-
from defusedxml import ElementTree
class TclResult:
pass
class CheckResult(TclResult):
def __init__(self, xml: str):
self.raw_xml = xml
root = ElementTree.fromstring(xml)
self.curef = root.find("CUREF").text
self.fvver = root.find("VERSION").find("FV").text
self.tvver = root.find("VERSION").find("TV").text
self.fw_id = root.find("FIRMWARE").find("FW_ID").text
fileinfo = root.find("FIRMWARE").find("FILESET").find("FILE")
self.fileid = fileinfo.find("FILE_ID").text
self.filename = fileinfo.find("FILENAME").text
self.filesize = fileinfo.find("SIZE").text
self.filehash = fileinfo.find("CHECKSUM").text