1
0
mirror of https://github.com/mbirth/tcl_ota_check.git synced 2024-11-10 06:16:46 +00:00

Took a hacksaw and split tcllib into several small parts.

This commit is contained in:
Markus Birth 2018-02-03 01:36:08 +01:00
parent 4456f76032
commit 34ba8a0db0
Signed by: mbirth
GPG Key ID: A9928D7A098C3A9A
16 changed files with 440 additions and 378 deletions

View File

@ -6,6 +6,7 @@
import sys import sys
import tcllib import tcllib
import tcllib.argparser import tcllib.argparser
from tcllib import ansi
from requests.exceptions import RequestException, Timeout from requests.exceptions import RequestException, Timeout
tcllib.make_escapes_work() tcllib.make_escapes_work()
@ -46,8 +47,8 @@ for prd, variant in prds.items():
txt_tv = tv txt_tv = tv
if tv != lastver: if tv != lastver:
txt_tv = "{} (old: {} / OTA: {})".format( txt_tv = "{} (old: {} / OTA: {})".format(
tcllib.ANSI_CYAN + txt_tv + tcllib.ANSI_RESET, ansi.CYAN + txt_tv + ansi.RESET,
tcllib.ANSI_CYAN_DARK + variant["last_full"] + tcllib.ANSI_RESET, ansi.CYAN_DARK + variant["last_full"] + ansi.RESET,
variant["last_ota"] variant["last_ota"]
) )
else: else:

View File

@ -8,6 +8,7 @@ import requests
import sys import sys
import tcllib import tcllib
import tcllib.argparser import tcllib.argparser
from tcllib import ansi
from requests.exceptions import RequestException from requests.exceptions import RequestException
tcllib.make_escapes_work() tcllib.make_escapes_work()
@ -52,7 +53,7 @@ for prd, variant in prds.items():
fc.fv = lastver fc.fv = lastver
check_xml = fc.do_check(max_tries=20) check_xml = fc.do_check(max_tries=20)
curef, fv, tv, fw_id, fileid, fn, fsize, fhash = fc.parse_check(check_xml) curef, fv, tv, fw_id, fileid, fn, fsize, fhash = fc.parse_check(check_xml)
versioninfo = tcllib.ANSI_YELLOW_DARK + fv + tcllib.ANSI_RESET + "" + tcllib.ANSI_YELLOW + tv + tcllib.ANSI_RESET + " (FULL: {})".format(variant["last_full"]) versioninfo = ansi.YELLOW_DARK + fv + ansi.RESET + "" + ansi.YELLOW + tv + ansi.RESET + " (FULL: {})".format(variant["last_full"])
print("{}: {} {} ({})".format(prd, versioninfo, fhash, model)) print("{}: {} {} ({})".format(prd, versioninfo, fhash, model))
except RequestException as e: except RequestException as e:
print("{} ({}): {}".format(prd, lastver, str(e))) print("{} ({}): {}".format(prd, lastver, str(e)))

View File

@ -7,6 +7,7 @@ import collections
import sys import sys
import tcllib import tcllib
import tcllib.argparser import tcllib.argparser
from tcllib import ansi
from requests.exceptions import RequestException, Timeout from requests.exceptions import RequestException, Timeout
tcllib.make_escapes_work() tcllib.make_escapes_work()
@ -68,7 +69,7 @@ for center in sorted(prddict.keys()):
curef = "PRD-{}-{:03}".format(center, j) curef = "PRD-{}-{:03}".format(center, j)
done_count += 1 done_count += 1
print("Checking {} ({}/{})".format(curef, done_count, total_count)) print("Checking {} ({}/{})".format(curef, done_count, total_count))
print(tcllib.ANSI_UP_DEL, end="") print(ansi.UP_DEL, end="")
try: try:
fc.reset_session() fc.reset_session()
fc.curef = curef fc.curef = curef

View File

@ -7,6 +7,7 @@ import collections
import sys import sys
import tcllib import tcllib
import tcllib.argparser import tcllib.argparser
from tcllib import ansi
from requests.exceptions import RequestException, Timeout from requests.exceptions import RequestException, Timeout
# Variants to scan for # Variants to scan for
@ -56,7 +57,7 @@ for center in to_scan:
curef = "PRD-{:05}-{:3}".format(center, j) curef = "PRD-{:05}-{:3}".format(center, j)
done_count += 1 done_count += 1
print("Checking {} ({}/{})".format(curef, done_count, total_count)) print("Checking {} ({}/{})".format(curef, done_count, total_count))
print(tcllib.ANSI_UP_DEL, end="") print(ansi.UP_DEL, end="")
try: try:
fc.reset_session() fc.reset_session()
fc.curef = curef fc.curef = curef

View File

@ -6,6 +6,7 @@
import sys import sys
import tcllib import tcllib
import tcllib.argparser import tcllib.argparser
from tcllib import ansi
from requests.exceptions import RequestException, Timeout from requests.exceptions import RequestException, Timeout
tcllib.make_escapes_work() tcllib.make_escapes_work()
@ -54,7 +55,7 @@ total_count = len(allvers)
for fv in allvers: for fv in allvers:
done_count += 1 done_count += 1
print("Checking {} ({}/{})".format(fv, done_count, total_count)) print("Checking {} ({}/{})".format(fv, done_count, total_count))
print(tcllib.ANSI_UP_DEL, end="") print(ansi.UP_DEL, end="")
try: try:
fc.reset_session() fc.reset_session()
fc.fv = fv fc.fv = fv

View File

@ -2,43 +2,18 @@
# pylint: disable=C0111,C0326 # pylint: disable=C0111,C0326
import base64
import binascii
import enum import enum
import errno
import glob
import hashlib
import json
import os
import platform import platform
import random
import time
import xml.dom.minidom
import zlib
from collections import OrderedDict
from math import floor
import numpy
import requests import requests
from defusedxml import ElementTree from . import credentials
from . import devlist
DEVICELIST_URL = "https://tclota.birth-online.de/json_lastupdates.php" from . import dumpmgr
DEVICELIST_FILE = "prds.json" from . import servervote
DEVICELIST_CACHE_SECONDS = 86400 from . import tclcheck
from . import tclrequest
ANSI_UP_DEL = u"\u001b[F\u001b[K" from . import tclchecksum
ANSI_BLACK = u"\u001b[0;30m" from . import tclencheader
ANSI_RED_DARK = u"\u001b[0;31m" from . import xmltools
ANSI_GREEN_DARK = u"\u001b[0;32m"
ANSI_YELLOW_DARK = u"\u001b[0;33m"
ANSI_CYAN_DARK = u"\u001b[0;36m"
ANSI_SILVER = u"\u001b[0;37m"
ANSI_GREY = u"\u001b[1;30m"
ANSI_RED = u"\u001b[1;31m"
ANSI_GREEN = u"\u001b[1;32m"
ANSI_YELLOW = u"\u001b[1;33m"
ANSI_CYAN = u"\u001b[1;36m"
ANSI_WHITE = u"\u001b[1;37m"
ANSI_RESET = u"\u001b[0m"
def make_escapes_work(): def make_escapes_work():
system = platform.system() system = platform.system()
@ -54,7 +29,17 @@ def make_escapes_work():
def default_enum(enumname, vardict): def default_enum(enumname, vardict):
return enum.IntEnum(enumname, vardict, module=__name__, qualname="tcllib.FotaCheck.{}".format(enumname)) return enum.IntEnum(enumname, vardict, module=__name__, qualname="tcllib.FotaCheck.{}".format(enumname))
class FotaCheck: class FotaCheck(
tclcheck.TclCheck,
tclrequest.TclRequest,
tclchecksum.TclChecksum,
tclencheader.TclEncHeader,
servervote.ServerVote,
credentials.Credentials,
devlist.DevList,
dumpmgr.DumpMgr,
xmltools.XmlTools
):
VDKEY = b"eJwdjwEOwDAIAr8kKFr//7HhmqXp8AIIDrYAgg8byiUXrwRJRXja+d6iNxu0AhUooDCN9rd6rDLxmGIakUVWo3IGCTRWqCAt6X4jGEIUAxgN0eYWnp+LkpHQAg/PsO90ELsy0Npm/n2HbtPndFgGEV31R9OmT4O4nrddjc3Qt6nWscx7e+WRHq5UnOudtjw5skuV09pFhvmqnOEIs4ljPeel1wfLYUF4\n" VDKEY = b"eJwdjwEOwDAIAr8kKFr//7HhmqXp8AIIDrYAgg8byiUXrwRJRXja+d6iNxu0AhUooDCN9rd6rDLxmGIakUVWo3IGCTRWqCAt6X4jGEIUAxgN0eYWnp+LkpHQAg/PsO90ELsy0Npm/n2HbtPndFgGEV31R9OmT4O4nrddjc3Qt6nWscx7e+WRHq5UnOudtjw5skuV09pFhvmqnOEIs4ljPeel1wfLYUF4\n"
CKTP = default_enum("CKTP", ["AUTO", "MANUAL"]) CKTP = default_enum("CKTP", ["AUTO", "MANUAL"])
MODE = default_enum("MODE", {"OTA": 2, "FULL": 4}) MODE = default_enum("MODE", {"OTA": 2, "FULL": 4})
@ -98,340 +83,3 @@ class FotaCheck:
else: else:
self.sess.headers.update({"User-Agent": "tcl"}) self.sess.headers.update({"User-Agent": "tcl"})
return self.sess return self.sess
@staticmethod
def get_salt():
millis = floor(time.time() * 1000)
tail = "{:06d}".format(random.randint(0, 999999))
return "{}{}".format(str(millis), tail)
def get_master_server(self):
weight_sum = 0
for i in self.master_servers_weights:
weight_sum += i
numpy_weights = []
for i in self.master_servers_weights:
numpy_weights.append(i/weight_sum)
return numpy.random.choice(self.master_servers, p=numpy_weights)
def master_server_downvote(self):
idx = self.master_servers.index(self.g2master)
if self.master_servers_weights[idx] > 1:
self.master_servers_weights[idx] -= 1
def master_server_upvote(self):
idx = self.master_servers.index(self.g2master)
if self.master_servers_weights[idx] < 10:
self.master_servers_weights[idx] += 1
def check_time_add(self, duration):
self.check_time_sum += duration
self.check_time_count += 1
def check_time_avg(self):
return (self.check_time_sum / self.check_time_count)
def master_server_vote_on_time(self, last_duration, avg_duration):
if last_duration < avg_duration - 0.5:
self.master_server_upvote()
elif last_duration > avg_duration + 0.5:
self.master_server_downvote()
def write_dump(self, data):
outfile = os.path.normpath("logs/{}.xml".format(self.get_salt()))
if not os.path.exists(os.path.dirname(outfile)):
try:
os.makedirs(os.path.dirname(outfile))
except OSError as e:
if e.errno != errno.EEXIST:
raise
with open(outfile, "w", encoding="utf-8") as f:
f.write(data)
self.last_dump_filename = outfile
def delete_last_dump(self):
if self.last_dump_filename:
os.unlink(self.last_dump_filename)
self.last_dump_filename = None
@staticmethod
def write_info_if_dumps_found():
# To disable this info, uncomment the following line.
#return
files = glob.glob(os.path.normpath("logs/*.xml"))
if len(files) > 0:
print()
print("{}There are {} logs collected in the logs/ directory.{} Please consider uploading".format(ANSI_YELLOW, len(files), ANSI_RESET))
print("them to https://tclota.birth-online.de/ by running {}./upload_logs.py{}.".format(ANSI_CYAN, ANSI_RESET))
def do_check(self, https=True, timeout=10, max_tries=5):
protocol = "https://" if https else "http://"
url = protocol + self.g2master + "/check.php"
params = OrderedDict()
params["id"] = self.serid
params["curef"] = self.curef
params["fv"] = self.fv
params["mode"] = self.mode.value
params["type"] = self.ftype
params["cltp"] = self.cltp.value
params["cktp"] = self.cktp.value
params["rtd"] = self.rtd.value
params["chnl"] = self.chnl.value
#params["osvs"] = self.osvs
#params["ckot"] = self.ckot.value
last_response = None
for num_try in range(0, max_tries):
try:
reqtime_start = time.perf_counter()
req = self.sess.get(url, params=params, timeout=timeout)
reqtime = time.perf_counter() - reqtime_start
reqtime_avg = self.check_time_avg()
self.check_time_add(reqtime)
last_response = req
if req.status_code == 200:
self.master_server_vote_on_time(reqtime, reqtime_avg)
req.encoding = "utf-8" # Force encoding as server doesn't give one
self.write_dump(req.text)
return req.text
elif req.status_code == 204:
self.master_server_vote_on_time(reqtime, reqtime_avg)
raise requests.exceptions.HTTPError("No update available.", response=req)
elif req.status_code == 404:
self.master_server_vote_on_time(reqtime, reqtime_avg)
raise requests.exceptions.HTTPError("No data for requested CUREF/FV combination.", response=req)
elif req.status_code not in [500, 502, 503]:
self.master_server_downvote()
req.raise_for_status()
raise requests.exceptions.HTTPError("HTTP {}.".format(req.status_code), response=req)
except requests.exceptions.Timeout:
pass
# Something went wrong, try a different server
self.master_server_downvote()
self.g2master = self.get_master_server()
protocol = "https://" if https else "http://"
url = protocol + self.g2master + "/check.php"
raise requests.exceptions.RetryError("Max tries ({}) reached.".format(max_tries), response=last_response)
@staticmethod
def pretty_xml(xmlstr):
mdx = xml.dom.minidom.parseString(xmlstr)
return mdx.toprettyxml(indent=" ")
@staticmethod
def parse_check(xmlstr):
root = ElementTree.fromstring(xmlstr)
curef = root.find("CUREF").text
fv = root.find("VERSION").find("FV").text
tv = root.find("VERSION").find("TV").text
fw_id = root.find("FIRMWARE").find("FW_ID").text
fileinfo = root.find("FIRMWARE").find("FILESET").find("FILE")
fileid = fileinfo.find("FILE_ID").text
filename = fileinfo.find("FILENAME").text
filesize = fileinfo.find("SIZE").text
filehash = fileinfo.find("CHECKSUM").text
return curef, fv, tv, fw_id, fileid, filename, filesize, filehash
def get_vk2(self, params_dict, cltp):
params_dict["cltp"] = cltp
query = ""
for k, v in params_dict.items():
if len(query) > 0:
query += "&"
query += k + "=" + str(v)
vdk = zlib.decompress(binascii.a2b_base64(self.VDKEY))
query += vdk.decode("utf-8")
engine = hashlib.sha1()
engine.update(bytes(query, "utf-8"))
hexhash = engine.hexdigest()
return hexhash
@staticmethod
def get_devicelist(force=False, output_diff=True):
need_download = True
old_prds = None
try:
filestat = os.stat(DEVICELIST_FILE)
filemtime = filestat.st_mtime
if filemtime > time.time() - DEVICELIST_CACHE_SECONDS:
need_download = False
with open(DEVICELIST_FILE, "rt") as df:
old_prds = json.load(df)
except FileNotFoundError:
pass
if need_download or force:
prds_json = requests.get(DEVICELIST_URL).text
with open(DEVICELIST_FILE, "wt") as df:
df.write(prds_json)
with open(DEVICELIST_FILE, "rt") as df:
prds = json.load(df)
if old_prds and output_diff:
FotaCheck.print_prd_diff(old_prds, prds)
return prds
@staticmethod
def print_prd_diff(old_prds, new_prds):
added_prds = [prd for prd in new_prds if prd not in old_prds]
removed_prds = [prd for prd in old_prds if prd not in new_prds]
for prd in removed_prds:
print("> Removed device {} (was at {} / OTA: {}).".format(ANSI_RED + prd + ANSI_RESET, old_prds[prd]["last_full"], old_prds[prd]["last_ota"]))
for prd in added_prds:
print("> New device {} ({} / OTA: {}).".format(ANSI_GREEN + prd + ANSI_RESET, new_prds[prd]["last_full"], new_prds[prd]["last_ota"]))
for prd, pdata in new_prds.items():
if prd in added_prds:
continue
odata = old_prds[prd]
if pdata["last_full"] != odata["last_full"] and pdata["last_ota"] != odata["last_ota"]:
print("> {}: {}{} (OTA: {}{})".format(
prd,
ANSI_CYAN_DARK + str(odata["last_full"]) + ANSI_RESET,
ANSI_CYAN + str(pdata["last_full"]) + ANSI_RESET,
ANSI_YELLOW_DARK + str(odata["last_ota"]) + ANSI_RESET,
ANSI_YELLOW + str(pdata["last_ota"]) + ANSI_RESET
))
elif pdata["last_full"] != odata["last_full"]:
print("> {}: {}{} (FULL)".format(prd, ANSI_CYAN_DARK + str(odata["last_full"]) + ANSI_RESET, ANSI_CYAN + str(pdata["last_full"]) + ANSI_RESET))
elif pdata["last_ota"] != odata["last_ota"]:
print("> {}: {}{} (OTA)".format(prd, ANSI_YELLOW_DARK + str(odata["last_ota"]) + ANSI_RESET, ANSI_YELLOW + str(pdata["last_ota"]) + ANSI_RESET))
@staticmethod
def get_creds():
creds = {
b"YWNjb3VudA==": b"emhlbmdodWEuZ2Fv",
b"cGFzc3dvcmQ=": b"cWFydUQ0b2s=",
}
params = {base64.b64decode(key): base64.b64decode(val) for key, val in creds.items()}
return params
@staticmethod
def get_creds2():
creds = {
b"YWNjb3VudA==": b"VGVsZUV4dFRlc3Q=",
b"cGFzc3dvcmQ=": b"dDA1MjM=",
}
params = {base64.b64decode(key): base64.b64decode(val) for key, val in creds.items()}
return params
'''
private HashMap<String, String> buildDownloadUrisParams(UpdatePackageInfo updatePackageInfo) {
FotaLog.m28v(TAG, "doAfterCheck");
String salt = FotaUtil.salt();
HashMap linkedHashMap = new LinkedHashMap();
linkedHashMap.put("id", this.internalBuilder.getParam("id"));
linkedHashMap.put("salt", salt);
linkedHashMap.put("curef", updatePackageInfo.mCuref);
linkedHashMap.put("fv", updatePackageInfo.mFv);
linkedHashMap.put("tv", updatePackageInfo.mTv);
linkedHashMap.put("type", "Firmware");
linkedHashMap.put("fw_id", updatePackageInfo.mFirmwareId);
linkedHashMap.put("mode", "2");
linkedHashMap.put("vk", generateVk2((LinkedHashMap) linkedHashMap.clone()));
linkedHashMap.put("cltp", "10");
linkedHashMap.put("cktp", this.internalBuilder.getParam("cktp"));
linkedHashMap.put("rtd", this.internalBuilder.getParam("rtd"));
linkedHashMap.put("chnl", this.internalBuilder.getParam("chnl"));
return linkedHashMap;
}
'''
def do_request(self, curef, fv, tv, fw_id):
url = "https://" + self.g2master + "/download_request.php"
params = OrderedDict()
params["id"] = self.serid
params["salt"] = self.get_salt()
params["curef"] = curef
params["fv"] = fv
params["tv"] = tv
params["type"] = self.ftype
params["fw_id"] = fw_id
params["mode"] = self.mode.value
params["vk"] = self.get_vk2(params, self.cltp.value)
params["cltp"] = self.cltp.value
params["cktp"] = self.cktp.value
params["rtd"] = self.rtd.value
if self.mode == self.MODE.FULL:
params["foot"] = 1
params["chnl"] = self.chnl.value
#print(repr(dict(params)))
req = self.sess.post(url, data=params)
if req.status_code == 200:
req.encoding = "utf-8" # Force encoding as server doesn't give one
self.write_dump(req.text)
return req.text
else:
print("REQUEST: " + repr(req))
print(repr(req.headers))
print(repr(req.text))
raise SystemExit
@staticmethod
def parse_request(xmlstr):
root = ElementTree.fromstring(xmlstr)
file = root.find("FILE_LIST").find("FILE")
fileid = file.find("FILE_ID").text
fileurl = file.find("DOWNLOAD_URL").text
s3_fileurl_node = file.find("S3_DOWNLOAD_URL")
s3_fileurl = ""
if s3_fileurl_node:
s3_fileurl = s3_fileurl_node.text
slave_list = root.find("SLAVE_LIST").findall("SLAVE")
enc_list = root.find("SLAVE_LIST").findall("ENCRYPT_SLAVE")
s3_slave_list = root.find("SLAVE_LIST").findall("S3_SLAVE")
slaves = [s.text for s in slave_list]
encslaves = [s.text for s in enc_list]
s3_slaves = [s.text for s in s3_slave_list]
return fileid, fileurl, slaves, encslaves, s3_fileurl, s3_slaves
def do_encrypt_header(self, encslave, address):
params = self.get_creds2()
params[b"address"] = bytes(address, "utf-8")
url = "http://" + encslave + "/encrypt_header.php"
req = self.sess.post(url, data=params, verify=False)
# Expect "HTTP 206 Partial Content" response
if req.status_code == 206:
return req.content
else:
print("ENCRYPT: " + repr(req))
print(repr(req.headers))
print(repr(req.text))
raise SystemExit
def do_checksum(self, encslave, address, uri):
url = "http://" + encslave + "/checksum.php"
params = self.get_creds2()
payload = {address: uri}
payload_json = json.dumps(payload)
params[b"address"] = bytes(payload_json, "utf-8")
#print(repr(dict(params)))
req = self.sess.post(url, data=params)
if req.status_code == 200:
req.encoding = "utf-8" # Force encoding as server doesn't give one
self.write_dump(req.text)
# <ENCRYPT_FOOTER>2abfa6f6507044fec995efede5d818e62a0b19b5</ENCRYPT_FOOTER> means ERROR (invalid ADDRESS!)
if "<ENCRYPT_FOOTER>2abfa6f6507044fec995efede5d818e62a0b19b5</ENCRYPT_FOOTER>" in req.text:
print("INVALID URI: {}".format(uri))
raise SystemExit
return req.text
else:
print("CHECKSUM: " + repr(req))
print(repr(req.headers))
print(repr(req.text))
raise SystemExit
@staticmethod
def parse_checksum(xmlstr):
root = ElementTree.fromstring(xmlstr)
file = root.find("FILE_CHECKSUM_LIST").find("FILE")
file_addr = file.find("ADDRESS").text
sha1_enc_footer = file.find("ENCRYPT_FOOTER").text
sha1_footer = file.find("FOOTER").text
sha1_body = file.find("BODY").text
return file_addr, sha1_body, sha1_enc_footer, sha1_footer

16
tcllib/ansi.py Normal file
View File

@ -0,0 +1,16 @@
# -*- coding: utf-8 -*-
UP_DEL = u"\u001b[F\u001b[K"
BLACK = u"\u001b[0;30m"
RED_DARK = u"\u001b[0;31m"
GREEN_DARK = u"\u001b[0;32m"
YELLOW_DARK = u"\u001b[0;33m"
CYAN_DARK = u"\u001b[0;36m"
SILVER = u"\u001b[0;37m"
GREY = u"\u001b[1;30m"
RED = u"\u001b[1;31m"
GREEN = u"\u001b[1;32m"
YELLOW = u"\u001b[1;33m"
CYAN = u"\u001b[1;36m"
WHITE = u"\u001b[1;37m"
RESET = u"\u001b[0m"

22
tcllib/credentials.py Normal file
View File

@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-
import base64
class Credentials:
@staticmethod
def get_creds():
creds = {
b"YWNjb3VudA==": b"emhlbmdodWEuZ2Fv",
b"cGFzc3dvcmQ=": b"cWFydUQ0b2s=",
}
params = {base64.b64decode(key): base64.b64decode(val) for key, val in creds.items()}
return params
@staticmethod
def get_creds2():
creds = {
b"YWNjb3VudA==": b"VGVsZUV4dFRlc3Q=",
b"cGFzc3dvcmQ=": b"dDA1MjM=",
}
params = {base64.b64decode(key): base64.b64decode(val) for key, val in creds.items()}
return params

65
tcllib/devlist.py Normal file
View File

@ -0,0 +1,65 @@
# -*- coding: utf-8 -*-
import json
import os
import requests
import time
from . import ansi
DEVICELIST_URL = "https://tclota.birth-online.de/json_lastupdates.php"
DEVICELIST_FILE = "prds.json"
DEVICELIST_CACHE_SECONDS = 86400
class DevList:
@staticmethod
def get_devicelist(force=False, output_diff=True):
need_download = True
old_prds = None
try:
filestat = os.stat(DEVICELIST_FILE)
filemtime = filestat.st_mtime
if filemtime > time.time() - DEVICELIST_CACHE_SECONDS:
need_download = False
with open(DEVICELIST_FILE, "rt") as df:
old_prds = json.load(df)
except FileNotFoundError:
pass
if need_download or force:
prds_json = requests.get(DEVICELIST_URL).text
with open(DEVICELIST_FILE, "wt") as df:
df.write(prds_json)
with open(DEVICELIST_FILE, "rt") as df:
prds = json.load(df)
if old_prds and output_diff:
DevList.print_prd_diff(old_prds, prds)
return prds
@staticmethod
def print_prd_diff(old_prds, new_prds):
added_prds = [prd for prd in new_prds if prd not in old_prds]
removed_prds = [prd for prd in old_prds if prd not in new_prds]
for prd in removed_prds:
print("> Removed device {} (was at {} / OTA: {}).".format(ansi.RED + prd + ansi.RESET, old_prds[prd]["last_full"], old_prds[prd]["last_ota"]))
for prd in added_prds:
print("> New device {} ({} / OTA: {}).".format(ansi.GREEN + prd + ansi.RESET, new_prds[prd]["last_full"], new_prds[prd]["last_ota"]))
for prd, pdata in new_prds.items():
if prd in added_prds:
continue
odata = old_prds[prd]
if pdata["last_full"] != odata["last_full"] and pdata["last_ota"] != odata["last_ota"]:
print("> {}: {}{} (OTA: {}{})".format(
prd,
ansi.CYAN_DARK + str(odata["last_full"]) + ansi.RESET,
ansi.CYAN + str(pdata["last_full"]) + ansi.RESET,
ansi.YELLOW_DARK + str(odata["last_ota"]) + ansi.RESET,
ansi.YELLOW + str(pdata["last_ota"]) + ansi.RESET
))
elif pdata["last_full"] != odata["last_full"]:
print("> {}: {}{} (FULL)".format(prd, ansi.CYAN_DARK + str(odata["last_full"]) + ansi.RESET, ansi.CYAN + str(pdata["last_full"]) + ansi.RESET))
elif pdata["last_ota"] != odata["last_ota"]:
print("> {}: {}{} (OTA)".format(prd, ansi.YELLOW_DARK + str(odata["last_ota"]) + ansi.RESET, ansi.YELLOW + str(pdata["last_ota"]) + ansi.RESET))

34
tcllib/dumpmgr.py Normal file
View File

@ -0,0 +1,34 @@
# -*- coding: utf-8 -*-
import errno
import glob
import os
from . import ansi
class DumpMgr:
def write_dump(self, data):
outfile = os.path.normpath("logs/{}.xml".format(self.get_salt()))
if not os.path.exists(os.path.dirname(outfile)):
try:
os.makedirs(os.path.dirname(outfile))
except OSError as e:
if e.errno != errno.EEXIST:
raise
with open(outfile, "w", encoding="utf-8") as f:
f.write(data)
self.last_dump_filename = outfile
def delete_last_dump(self):
if self.last_dump_filename:
os.unlink(self.last_dump_filename)
self.last_dump_filename = None
@staticmethod
def write_info_if_dumps_found():
# To disable this info, uncomment the following line.
#return
files = glob.glob(os.path.normpath("logs/*.xml"))
if len(files) > 0:
print()
print("{}There are {} logs collected in the logs/ directory.{} Please consider uploading".format(ansi.YELLOW, len(files), ansi.RESET))
print("them to https://tclota.birth-online.de/ by running {}./upload_logs.py{}.".format(ansi.CYAN, ansi.RESET))

36
tcllib/servervote.py Normal file
View File

@ -0,0 +1,36 @@
# -*- coding: utf-8 -*-
import numpy
class ServerVote:
def get_master_server(self):
weight_sum = 0
for i in self.master_servers_weights:
weight_sum += i
numpy_weights = []
for i in self.master_servers_weights:
numpy_weights.append(i/weight_sum)
return numpy.random.choice(self.master_servers, p=numpy_weights)
def master_server_downvote(self):
idx = self.master_servers.index(self.g2master)
if self.master_servers_weights[idx] > 1:
self.master_servers_weights[idx] -= 1
def master_server_upvote(self):
idx = self.master_servers.index(self.g2master)
if self.master_servers_weights[idx] < 10:
self.master_servers_weights[idx] += 1
def check_time_add(self, duration):
self.check_time_sum += duration
self.check_time_count += 1
def check_time_avg(self):
return (self.check_time_sum / self.check_time_count)
def master_server_vote_on_time(self, last_duration, avg_duration):
if last_duration < avg_duration - 0.5:
self.master_server_upvote()
elif last_duration > avg_duration + 0.5:
self.master_server_downvote()

70
tcllib/tclcheck.py Normal file
View File

@ -0,0 +1,70 @@
# -*- coding: utf-8 -*-
import time
from collections import OrderedDict
import requests
from defusedxml import ElementTree
class TclCheck:
def do_check(self, https=True, timeout=10, max_tries=5):
protocol = "https://" if https else "http://"
url = protocol + self.g2master + "/check.php"
params = OrderedDict()
params["id"] = self.serid
params["curef"] = self.curef
params["fv"] = self.fv
params["mode"] = self.mode.value
params["type"] = self.ftype
params["cltp"] = self.cltp.value
params["cktp"] = self.cktp.value
params["rtd"] = self.rtd.value
params["chnl"] = self.chnl.value
#params["osvs"] = self.osvs
#params["ckot"] = self.ckot.value
last_response = None
for num_try in range(0, max_tries):
try:
reqtime_start = time.perf_counter()
req = self.sess.get(url, params=params, timeout=timeout)
reqtime = time.perf_counter() - reqtime_start
reqtime_avg = self.check_time_avg()
self.check_time_add(reqtime)
last_response = req
if req.status_code == 200:
self.master_server_vote_on_time(reqtime, reqtime_avg)
req.encoding = "utf-8" # Force encoding as server doesn't give one
self.write_dump(req.text)
return req.text
elif req.status_code == 204:
self.master_server_vote_on_time(reqtime, reqtime_avg)
raise requests.exceptions.HTTPError("No update available.", response=req)
elif req.status_code == 404:
self.master_server_vote_on_time(reqtime, reqtime_avg)
raise requests.exceptions.HTTPError("No data for requested CUREF/FV combination.", response=req)
elif req.status_code not in [500, 502, 503]:
self.master_server_downvote()
req.raise_for_status()
raise requests.exceptions.HTTPError("HTTP {}.".format(req.status_code), response=req)
except requests.exceptions.Timeout:
pass
# Something went wrong, try a different server
self.master_server_downvote()
self.g2master = self.get_master_server()
protocol = "https://" if https else "http://"
url = protocol + self.g2master + "/check.php"
raise requests.exceptions.RetryError("Max tries ({}) reached.".format(max_tries), response=last_response)
@staticmethod
def parse_check(xmlstr):
root = ElementTree.fromstring(xmlstr)
curef = root.find("CUREF").text
fv = root.find("VERSION").find("FV").text
tv = root.find("VERSION").find("TV").text
fw_id = root.find("FIRMWARE").find("FW_ID").text
fileinfo = root.find("FIRMWARE").find("FILESET").find("FILE")
fileid = fileinfo.find("FILE_ID").text
filename = fileinfo.find("FILENAME").text
filesize = fileinfo.find("SIZE").text
filehash = fileinfo.find("CHECKSUM").text
return curef, fv, tv, fw_id, fileid, filename, filesize, filehash

39
tcllib/tclchecksum.py Normal file
View File

@ -0,0 +1,39 @@
# -*- coding: utf-8 -*-
import json
from defusedxml import ElementTree
class TclChecksum:
def do_checksum(self, encslave, address, uri):
url = "http://" + encslave + "/checksum.php"
params = self.get_creds2()
payload = {address: uri}
payload_json = json.dumps(payload)
params[b"address"] = bytes(payload_json, "utf-8")
#print(repr(dict(params)))
req = self.sess.post(url, data=params)
if req.status_code == 200:
req.encoding = "utf-8" # Force encoding as server doesn't give one
self.write_dump(req.text)
# <ENCRYPT_FOOTER>2abfa6f6507044fec995efede5d818e62a0b19b5</ENCRYPT_FOOTER> means ERROR (invalid ADDRESS!)
if "<ENCRYPT_FOOTER>2abfa6f6507044fec995efede5d818e62a0b19b5</ENCRYPT_FOOTER>" in req.text:
print("INVALID URI: {}".format(uri))
raise SystemExit
return req.text
else:
print("CHECKSUM: " + repr(req))
print(repr(req.headers))
print(repr(req.text))
raise SystemExit
@staticmethod
def parse_checksum(xmlstr):
root = ElementTree.fromstring(xmlstr)
file = root.find("FILE_CHECKSUM_LIST").find("FILE")
file_addr = file.find("ADDRESS").text
sha1_enc_footer = file.find("ENCRYPT_FOOTER").text
sha1_footer = file.find("FOOTER").text
sha1_body = file.find("BODY").text
return file_addr, sha1_body, sha1_enc_footer, sha1_footer

16
tcllib/tclencheader.py Normal file
View File

@ -0,0 +1,16 @@
# -*- coding: utf-8 -*-
class TclEncHeader:
def do_encrypt_header(self, encslave, address):
params = self.get_creds2()
params[b"address"] = bytes(address, "utf-8")
url = "http://" + encslave + "/encrypt_header.php"
req = self.sess.post(url, data=params, verify=False)
# Expect "HTTP 206 Partial Content" response
if req.status_code == 206:
return req.content
else:
print("ENCRYPT: " + repr(req))
print(repr(req.headers))
print(repr(req.text))
raise SystemExit

102
tcllib/tclrequest.py Normal file
View File

@ -0,0 +1,102 @@
# -*- coding: utf-8 -*-
import binascii
import hashlib
import random
import time
from math import floor
import zlib
from collections import OrderedDict
from defusedxml import ElementTree
'''
private HashMap<String, String> buildDownloadUrisParams(UpdatePackageInfo updatePackageInfo) {
FotaLog.m28v(TAG, "doAfterCheck");
String salt = FotaUtil.salt();
HashMap linkedHashMap = new LinkedHashMap();
linkedHashMap.put("id", this.internalBuilder.getParam("id"));
linkedHashMap.put("salt", salt);
linkedHashMap.put("curef", updatePackageInfo.mCuref);
linkedHashMap.put("fv", updatePackageInfo.mFv);
linkedHashMap.put("tv", updatePackageInfo.mTv);
linkedHashMap.put("type", "Firmware");
linkedHashMap.put("fw_id", updatePackageInfo.mFirmwareId);
linkedHashMap.put("mode", "2");
linkedHashMap.put("vk", generateVk2((LinkedHashMap) linkedHashMap.clone()));
linkedHashMap.put("cltp", "10");
linkedHashMap.put("cktp", this.internalBuilder.getParam("cktp"));
linkedHashMap.put("rtd", this.internalBuilder.getParam("rtd"));
linkedHashMap.put("chnl", this.internalBuilder.getParam("chnl"));
return linkedHashMap;
}
'''
class TclRequest:
@staticmethod
def get_salt():
millis = floor(time.time() * 1000)
tail = "{:06d}".format(random.randint(0, 999999))
return "{}{}".format(str(millis), tail)
def get_vk2(self, params_dict, cltp):
params_dict["cltp"] = cltp
query = ""
for k, v in params_dict.items():
if len(query) > 0:
query += "&"
query += k + "=" + str(v)
vdk = zlib.decompress(binascii.a2b_base64(self.VDKEY))
query += vdk.decode("utf-8")
engine = hashlib.sha1()
engine.update(bytes(query, "utf-8"))
hexhash = engine.hexdigest()
return hexhash
def do_request(self, curef, fv, tv, fw_id):
url = "https://" + self.g2master + "/download_request.php"
params = OrderedDict()
params["id"] = self.serid
params["salt"] = self.get_salt()
params["curef"] = curef
params["fv"] = fv
params["tv"] = tv
params["type"] = self.ftype
params["fw_id"] = fw_id
params["mode"] = self.mode.value
params["vk"] = self.get_vk2(params, self.cltp.value)
params["cltp"] = self.cltp.value
params["cktp"] = self.cktp.value
params["rtd"] = self.rtd.value
if self.mode == self.MODE.FULL:
params["foot"] = 1
params["chnl"] = self.chnl.value
#print(repr(dict(params)))
req = self.sess.post(url, data=params)
if req.status_code == 200:
req.encoding = "utf-8" # Force encoding as server doesn't give one
self.write_dump(req.text)
return req.text
else:
print("REQUEST: " + repr(req))
print(repr(req.headers))
print(repr(req.text))
raise SystemExit
@staticmethod
def parse_request(xmlstr):
root = ElementTree.fromstring(xmlstr)
file = root.find("FILE_LIST").find("FILE")
fileid = file.find("FILE_ID").text
fileurl = file.find("DOWNLOAD_URL").text
s3_fileurl_node = file.find("S3_DOWNLOAD_URL")
s3_fileurl = ""
if s3_fileurl_node:
s3_fileurl = s3_fileurl_node.text
slave_list = root.find("SLAVE_LIST").findall("SLAVE")
enc_list = root.find("SLAVE_LIST").findall("ENCRYPT_SLAVE")
s3_slave_list = root.find("SLAVE_LIST").findall("S3_SLAVE")
slaves = [s.text for s in slave_list]
encslaves = [s.text for s in enc_list]
s3_slaves = [s.text for s in s3_slave_list]
return fileid, fileurl, slaves, encslaves, s3_fileurl, s3_slaves

9
tcllib/xmltools.py Normal file
View File

@ -0,0 +1,9 @@
# -*- coding: utf-8 -*-
import xml.dom.minidom
class XmlTools:
@staticmethod
def pretty_xml(xmlstr):
mdx = xml.dom.minidom.parseString(xmlstr)
return mdx.toprettyxml(indent=" ")