1
0

Compare commits

28 Commits

Author SHA1 Message Date
mbirth caf4a6df52 Add demo credentials 2018-09-01 12:00:00 +02:00
mbirth 0436260abe Small fix in output 2018-08-29 21:12:30 +02:00
mbirth 5066499a4e Added tclchecknew.py and CheckNewRequest 2018-07-10 22:20:14 +02:00
mbirth d06feb6e4a New version finder for specific modes 2018-02-14 12:14:50 +01:00
mbirth 6cbea7ed3a Fix README. 2018-07-02 15:45:58 +02:00
mbirth 8780e60cb5 Rename scripts for easier handling. 2018-07-02 15:44:43 +02:00
thurask 375a87c4b8 enable multiple prd filtering for findprd 2018-06-30 11:42:00 -04:00
thurask 4be09b5b3b simplify findprd filtering 2018-06-11 22:54:31 -04:00
thurask e4313a713f enable key2 for allota 2018-06-11 22:42:56 -04:00
thurask d5b5c94f42 add defaultparser to gapfill, uploader 2018-06-11 22:40:49 -04:00
thurask 42a8c5653b add new syntax option to findprd2 2018-06-11 22:37:38 -04:00
thurask acf55d8e23 add new syntax support to findprd 2018-06-11 19:57:29 -04:00
thurask 6866732133 update readme 2018-06-03 10:47:26 -04:00
mbirth 8fb286b020 Detect OTAs to older versions (~) and don't log if already known (%). 2018-05-29 10:59:47 +02:00
mbirth 7604fe2335 Skip all DTEK PRDs as they get their OTAs from elsewhere. 2018-05-16 13:22:47 +02:00
thurask 5f0e334318 add no prefix option for findprd for certain devices 2018-05-08 00:05:24 -04:00
mbirth f3aae4ce7f Fix file perms. 2018-03-01 01:43:26 +01:00
mbirth 995b451d11 Detect broken JSON and download new. 2018-03-01 01:42:59 +01:00
mbirth 16d33b0cb6 More trying to please Scrutinizer. 2018-02-15 00:39:01 +01:00
mbirth 1707bcad26 Show off Scrutinizer score on GitHub. 2018-02-15 00:29:07 +01:00
mbirth 7d578b979b Improve a bit more. Also added docstrings. 2018-02-15 00:27:01 +01:00
mbirth bfc7288f61 Type hinting and try to remove complexity. 2018-02-15 00:20:28 +01:00
thurask c29d32d39f add db updater script 2018-02-13 14:18:21 -05:00
thurask eef0f755e3 pep8, pylint, isort 2018-02-10 20:38:38 -05:00
mbirth 3059ff13f7 Fix naming issue. 2018-02-11 01:56:56 +01:00
mbirth 75d23502a3 Merge branch 'rewrite' 2018-02-11 01:53:03 +01:00
mbirth bab58107fe More cleanup. 2018-02-11 01:51:24 +01:00
mbirth e582642936 Cleanup, moved dumpmgr class. 2018-02-11 01:48:47 +01:00
39 changed files with 791 additions and 585 deletions
+19 -4
View File
@@ -1,3 +1,8 @@
[![Scrutinizer Code Quality](https://scrutinizer-ci.com/g/mbirth/tcl_ota_check/badges/quality-score.png?b=master)](https://scrutinizer-ci.com/g/mbirth/tcl_ota_check/?branch=master)
[![Build Status](https://scrutinizer-ci.com/g/mbirth/tcl_ota_check/badges/build.png?b=master)](https://scrutinizer-ci.com/g/mbirth/tcl_ota_check/build-status/master)
[![Code Intelligence Status](https://scrutinizer-ci.com/g/mbirth/tcl_ota_check/badges/code-intelligence.svg?b=master)](https://scrutinizer-ci.com/code-intelligence)
TCL OTA Check
=============
@@ -75,22 +80,22 @@ Checks for the latest OTA (i.e. partial updates for over-the-air installation) v
for all different models and variants.
### tclcheck_findprd.py
### tclfindprd.py
Scans for not yet known variants of a model.
### tclcheck_findprd2.py
### tclfindprd2.py
Scans for not yet known models.
### tclcheck_findver.py
### tclfindver.py
Scans for not yet known firmware versions.
### tclcheck_gapfill.py
### tclgapfill.py
Queries the [database server](https://tclota.birth-online.de/) for known versions and tries to find
OTA files not yet in the database.
@@ -106,6 +111,16 @@ Universal tool to query TCL's servers in different ways to manually check for a
Queries the checksum for a specific FULL file.
### tcldown.py
Downloads a firmware file from given file ID.
### update_db.py
Updates local copy of database.
### upload_logs.py
Uploads all collected server answers to the [database server](https://tclota.birth-online.de/).
+7 -4
View File
@@ -6,14 +6,14 @@
"""Checks for the latest FULL or OTA updates for specified PRD number."""
import os
import random
import sys
from tcllib import argparser
from tcllib.devices import Device
from tcllib.requests import RequestRunner, CheckRequest, DownloadRequest, \
ChecksumRequest, EncryptHeaderRequest, ServerSelector, \
write_info_if_dumps_found
from tcllib.dumpmgr import write_info_if_dumps_found
from tcllib.requests import (CheckRequest, ChecksumRequest, DownloadRequest,
EncryptHeaderRequest, RequestRunner,
ServerSelector)
dpdesc = """
@@ -33,6 +33,7 @@ args = dp.parse_args(sys.argv[1:])
dev = Device(args.prd[0], args.fvver)
dev.imei = "3531510"
def sel_mode(txtmode, autoval, rawval):
"""Handle custom mode."""
if rawval:
@@ -43,6 +44,7 @@ def sel_mode(txtmode, autoval, rawval):
return dev.MODE_STATES["OTA"]
return dev.MODE_STATES["FULL"]
def sel_cltp(txtmode, autoval, rawval):
"""Handle custom CLTP."""
if rawval:
@@ -53,6 +55,7 @@ def sel_cltp(txtmode, autoval, rawval):
return dev.CLTP_STATES["DESKTOP"]
return dev.CLTP_STATES["MOBILE"]
if args.imei:
print("Use specified IMEI: {}".format(args.imei))
dev.imei = args.imei
Regular → Executable
+4 -3
View File
@@ -9,7 +9,8 @@ import sys
from tcllib import ansi, argparser, devlist
from tcllib.devices import DesktopDevice
from tcllib.requests import RequestRunner, CheckRequest, ServerVoteSelector, write_info_if_dumps_found
from tcllib.dumpmgr import write_info_if_dumps_found
from tcllib.requests import CheckRequest, RequestRunner, ServerVoteSelector
dev = DesktopDevice()
@@ -47,8 +48,8 @@ for prd, variant in prds.items():
if result.tvver != lastver:
txt_tv = "{} (old: {} / OTA: {})".format(
ansi.CYAN + txt_tv + ansi.RESET,
ansi.CYAN_DARK + variant["last_full"] + ansi.RESET,
variant["last_ota"]
ansi.CYAN_DARK + str(variant["last_full"]) + ansi.RESET,
str(variant["last_ota"])
)
else:
result.delete_dump()
Regular → Executable
+4 -1
View File
@@ -9,7 +9,8 @@ import sys
from tcllib import ansi, argparser, devlist
from tcllib.devices import MobileDevice
from tcllib.requests import RequestRunner, CheckRequest, ServerVoteSelector, write_info_if_dumps_found
from tcllib.dumpmgr import write_info_if_dumps_found
from tcllib.requests import CheckRequest, RequestRunner, ServerVoteSelector
dev = MobileDevice()
@@ -40,6 +41,8 @@ runner = RequestRunner(ServerVoteSelector())
runner.max_tries = 20
for prd, variant in prds.items():
if "PRD" not in prd:
continue
model = variant["variant"]
lastver = variant["last_ota"]
lastver = variant["last_full"] if lastver is None else lastver
+71
View File
@@ -0,0 +1,71 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0326,C0103
"""Find all OTA updates for a given PRD."""
import sys
from tcllib import ansi, argparser
from tcllib.devices import MobileDevice
from tcllib.dumpmgr import write_info_if_dumps_found
from tcllib.requests import CheckRequest, RequestRunner, ServerVoteSelector
dpdesc = """
Finds all valid OTA updates for a given PRD. Scan range can be set by
startver and endver switches.
"""
dp = argparser.DefaultParser(__file__, dpdesc)
dp.add_argument("prd", help="CU Reference #, e.g. PRD-63117-011")
dp.add_argument("startver", help="Beginning of scan range", nargs="?", default="AAA000")
dp.add_argument("endver", help="End of scan range", nargs="?", default="AAZ999")
args = dp.parse_args(sys.argv[1:])
dev = MobileDevice()
dev.curef = args.prd
dev.mode = 3
start_ver = args.startver
end_ver = args.endver
print("Valid firmwares for model {} (between {} and {}):".format(dev.curef, start_ver, end_ver))
cur_ver = start_ver
allvers = []
while True:
allvers.append(cur_ver)
if cur_ver == end_ver:
break
letters = list(cur_ver[:3])
num = int(cur_ver[3:6])
num += 1
if num > 999:
num = 0
for i in range(2, -1, -1):
if letters[i] == "Z":
letters[i] = "A"
continue
letters[i] = chr(ord(letters[i])+1)
break
cur_ver = "{:3}{:03d}".format("".join(letters), num)
runner = RequestRunner(ServerVoteSelector(), https=False)
runner.max_tries = 20
done_count = 0
total_count = len(allvers)
for fv in allvers:
done_count += 1
print("Checking {} ({}/{})".format(fv, done_count, total_count))
print(ansi.UP_DEL, end="")
dev.fwver = fv
chk = CheckRequest(dev)
runner.run(chk)
if chk.success:
chkres = chk.get_result()
txt_tv = chkres.tvver
print("{}: {}{} {}".format(dev.curef, fv, txt_tv, chkres.filehash))
print("Scan complete.")
write_info_if_dumps_found()
+71
View File
@@ -0,0 +1,71 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0326,C0103
"""Find all OTA updates for a given PRD."""
import sys
from tcllib import ansi, argparser
from tcllib.devices import MobileDevice
from tcllib.dumpmgr import write_info_if_dumps_found
from tcllib.requests import CheckRequest, RequestRunner, ServerVoteSelector
dpdesc = """
Finds all valid OTA updates for a given PRD. Scan range can be set by
startver and endver switches.
"""
dp = argparser.DefaultParser(__file__, dpdesc)
dp.add_argument("prd", help="CU Reference #, e.g. PRD-63117-011")
dp.add_argument("startver", help="Beginning of scan range", nargs="?", default="AAA000")
dp.add_argument("endver", help="End of scan range", nargs="?", default="AAZ999")
args = dp.parse_args(sys.argv[1:])
dev = MobileDevice()
dev.curef = args.prd
dev.mode = 5
start_ver = args.startver
end_ver = args.endver
print("Valid firmwares for model {} (between {} and {}):".format(dev.curef, start_ver, end_ver))
cur_ver = start_ver
allvers = []
while True:
allvers.append(cur_ver)
if cur_ver == end_ver:
break
letters = list(cur_ver[:3])
num = int(cur_ver[3:6])
num += 1
if num > 999:
num = 0
for i in range(2, -1, -1):
if letters[i] == "Z":
letters[i] = "A"
continue
letters[i] = chr(ord(letters[i])+1)
break
cur_ver = "{:3}{:03d}".format("".join(letters), num)
runner = RequestRunner(ServerVoteSelector(), https=False)
runner.max_tries = 20
done_count = 0
total_count = len(allvers)
for fv in allvers:
done_count += 1
print("Checking {} ({}/{})".format(fv, done_count, total_count))
print(ansi.UP_DEL, end="")
dev.fwver = fv
chk = CheckRequest(dev)
runner.run(chk)
if chk.success:
chkres = chk.get_result()
txt_tv = chkres.tvver
print("{}: {}{} {}".format(dev.curef, fv, txt_tv, chkres.filehash))
print("Scan complete.")
write_info_if_dumps_found()
+71
View File
@@ -0,0 +1,71 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0326,C0103
"""Find all OTA updates for a given PRD."""
import sys
from tcllib import ansi, argparser
from tcllib.devices import MobileDevice
from tcllib.dumpmgr import write_info_if_dumps_found
from tcllib.requests import CheckRequest, RequestRunner, ServerVoteSelector
dpdesc = """
Finds all valid OTA updates for a given PRD. Scan range can be set by
startver and endver switches.
"""
dp = argparser.DefaultParser(__file__, dpdesc)
dp.add_argument("prd", help="CU Reference #, e.g. PRD-63117-011")
dp.add_argument("startver", help="Beginning of scan range", nargs="?", default="AAA000")
dp.add_argument("endver", help="End of scan range", nargs="?", default="AAZ999")
args = dp.parse_args(sys.argv[1:])
dev = MobileDevice()
dev.curef = args.prd
dev.mode = 6
start_ver = args.startver
end_ver = args.endver
print("Valid firmwares for model {} (between {} and {}):".format(dev.curef, start_ver, end_ver))
cur_ver = start_ver
allvers = []
while True:
allvers.append(cur_ver)
if cur_ver == end_ver:
break
letters = list(cur_ver[:3])
num = int(cur_ver[3:6])
num += 1
if num > 999:
num = 0
for i in range(2, -1, -1):
if letters[i] == "Z":
letters[i] = "A"
continue
letters[i] = chr(ord(letters[i])+1)
break
cur_ver = "{:3}{:03d}".format("".join(letters), num)
runner = RequestRunner(ServerVoteSelector(), https=False)
runner.max_tries = 20
done_count = 0
total_count = len(allvers)
for fv in allvers:
done_count += 1
print("Checking {} ({}/{})".format(fv, done_count, total_count))
print(ansi.UP_DEL, end="")
dev.fwver = fv
chk = CheckRequest(dev)
runner.run(chk)
if chk.success:
chkres = chk.get_result()
txt_tv = chkres.tvver
print("{}: {}{} {}".format(dev.curef, fv, txt_tv, chkres.filehash))
print("Scan complete.")
write_info_if_dumps_found()
+128
View File
@@ -0,0 +1,128 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0326,C0103
"""Checks for the latest FULL or OTA updates for specified PRD number."""
import os
import sys
from tcllib import argparser
from tcllib.devices import Device
from tcllib.dumpmgr import write_info_if_dumps_found
from tcllib.requests import (CheckNewRequest, ChecksumRequest, DownloadRequest,
EncryptHeaderRequest, RequestRunner,
ServerSelector)
dpdesc = """
Checks for the latest FULL updates for the specified PRD number or for an OTA from the
version specified as fvver.
"""
dp = argparser.DefaultParser(__file__, dpdesc)
dp.add_argument("prd", nargs=1, help="CU Reference #, e.g. PRD-63117-011")
dp.add_argument("fvver", nargs="?", help="Firmware version to check for OTA updates, e.g. AAM481 (omit to run FULL check)", default="AAA000")
dp.add_argument("-i", "--imei", help="use specified IMEI instead of default", type=str)
dp.add_argument("-m", "--mode", help="force type of update to check for", default="auto", type=str, choices=["full", "ota"])
dp.add_argument("-t", "--type", help="force type of check to run", default="auto", type=str, choices=["desktop", "mobile"])
dp.add_argument("--rawmode", help="override --mode with raw value (2=OTA, 4=FULL)", metavar="MODE")
dp.add_argument("--rawcltp", help="override --type with raw value (10=MOBILE, 2010=DESKTOP)", metavar="CLTP")
args = dp.parse_args(sys.argv[1:])
dev = Device(args.prd[0], args.fvver)
dev.imei = "3531510"
def sel_mode(txtmode, autoval, rawval):
"""Handle custom mode."""
if rawval:
return rawval
if txtmode == "auto":
return autoval
elif txtmode == "ota":
return dev.MODE_STATES["OTA"]
return dev.MODE_STATES["FULL"]
def sel_cltp(txtmode, autoval, rawval):
"""Handle custom CLTP."""
if rawval:
return rawval
if txtmode == "auto":
return autoval
elif txtmode == "desktop":
return dev.CLTP_STATES["DESKTOP"]
return dev.CLTP_STATES["MOBILE"]
if args.imei:
print("Use specified IMEI: {}".format(args.imei))
dev.imei = args.imei
if args.fvver == "AAA000":
dev.mode = sel_mode(args.mode, dev.MODE_STATES["FULL"], args.rawmode)
dev.cltp = sel_cltp(args.type, dev.CLTP_STATES["DESKTOP"], args.rawcltp)
else:
dev.mode = sel_mode(args.mode, dev.MODE_STATES["OTA"], args.rawmode)
dev.cltp = sel_cltp(args.type, dev.CLTP_STATES["MOBILE"], args.rawcltp)
print("Mode: {}".format(dev.mode))
print("CLTP: {}".format(dev.cltp))
runner = RequestRunner(ServerSelector())
# Check for update
chk = CheckNewRequest(dev)
runner.run(chk)
if not chk.success:
print("{}".format(chk.error))
sys.exit(2)
chkres = chk.get_result()
print(chkres.pretty_xml())
# Request download
dlr = DownloadRequest(dev, chkres.tvver, chkres.fw_id)
runner.run(dlr)
if not dlr.success:
print("{}".format(dlr.error))
sys.exit(3)
dlrres = dlr.get_result()
print(dlrres.pretty_xml())
if dlrres.encslaves:
encrunner = RequestRunner(ServerSelector(dlrres.encslaves), https=False)
cks = ChecksumRequest(dlrres.fileurl, dlrres.fileurl)
encrunner.run(cks)
if not cks.success:
print("{}".format(cks.error))
sys.exit(4)
cksres = cks.get_result()
print(cksres.pretty_xml())
for s in dlrres.slaves:
print("http://{}{}".format(s, dlrres.fileurl))
for s in dlrres.s3_slaves:
print("http://{}{}".format(s, dlrres.s3_fileurl))
if dev.mode == dev.MODE_STATES["FULL"]:
hdr = EncryptHeaderRequest(dlrres.fileurl)
encrunner.run(hdr)
if not hdr.success:
print("{}".format(hdr.error))
sys.exit(5)
hdrres = hdr.get_result()
headname = "header_{}.bin".format(chkres.tvver)
headdir = "headers"
if not os.path.exists(headdir):
os.makedirs(headdir)
if len(hdrres.rawdata) == 4194320:
# TODO: Check sha1sum
print("Header length check passed. Writing to {}.".format(headname))
with open(os.path.join(headdir, headname), "wb") as f:
f.write(hdrres.rawdata)
else:
print("Header length invalid ({}).".format(len(hdrres.rawdata)))
write_info_if_dumps_found()
Regular → Executable
+1 -1
View File
@@ -8,7 +8,7 @@
import sys
from tcllib import argparser
from tcllib.requests import RequestRunner, ChecksumRequest, ServerSelector
from tcllib.requests import ChecksumRequest, RequestRunner, ServerSelector
encslaves = [
"54.238.56.196",
Regular → Executable
+4 -3
View File
@@ -10,9 +10,9 @@ import sys
from tcllib import argparser
from tcllib.devices import DesktopDevice
from tcllib.requests import RequestRunner, CheckRequest, DownloadRequest, \
ChecksumRequest, EncryptHeaderRequest, ServerSelector, \
write_info_if_dumps_found
from tcllib.dumpmgr import write_info_if_dumps_found
from tcllib.requests import (DownloadRequest, EncryptHeaderRequest,
RequestRunner, ServerSelector)
dpdesc = """
@@ -31,6 +31,7 @@ args = dp.parse_args(sys.argv[1:])
dev = DesktopDevice()
def sel_mode(defaultmode, rawval):
"""Handle custom mode."""
if rawval:
+22 -11
View File
@@ -10,19 +10,20 @@ import sys
from tcllib import ansi, argparser, devlist
from tcllib.devices import DesktopDevice
from tcllib.requests import RequestRunner, CheckRequest, ServerVoteSelector, \
write_info_if_dumps_found
from tcllib.dumpmgr import write_info_if_dumps_found
from tcllib.requests import CheckRequest, RequestRunner, ServerVoteSelector
dpdesc = """
Finds new PRD numbers for all known variants, or specified variants with tocheck. Scan range
can be set by floor and ceiling switches.
"""
dp = argparser.DefaultParser(__file__, dpdesc)
dp.add_argument("tocheck", help="CU Reference # to filter scan results", nargs="?", default=None)
dp.add_argument("tocheck", help="CU Reference #(s) to filter scan results", nargs="*", default=None)
dp.add_argument("-f", "--floor", help="Beginning of scan range", dest="floor", nargs="?", type=int, default=0)
dp.add_argument("-c", "--ceiling", help="End of scan range", dest="ceiling", nargs="?", type=int, default=999)
dp.add_argument("-l", "--local", help="Force using local database", dest="local", action="store_true", default=False)
dp.add_argument("-np", "--no-prefix", help="Skip 'PRD-' prefix", dest="noprefix", action="store_true", default=False)
dp.add_argument("-k2", "--key2", help="V2 syntax", dest="key2mode", action="store_true", default=False)
args = dp.parse_args(sys.argv[1:])
floor = args.floor
@@ -37,27 +38,37 @@ print(" OK")
print("Valid PRDs not already in database:")
prds = [x.replace("PRD-", "").split("-") for x in prd_db]
prdx = list({x[0]: x[1]} for x in prds)
prds = [x.replace("APBI-PRD", "").replace("PRD-", "").replace("-", "") for x in prd_db]
prdx = list({x[0:5]: x[5:]} for x in prds)
prddict = collections.defaultdict(list)
for prdc in prdx:
for key, value in prdc.items():
prddict[key].append(value)
if args.tocheck is not None:
args.tocheck = args.tocheck.replace("PRD-", "")
if not isinstance(args.tocheck, list):
args.tocheck = [args.tocheck]
args.tocheck = [toch.replace("APBI-PRD", "").replace("PRD-", "") for toch in args.tocheck]
prdkeys = list(prddict.keys())
for k in prdkeys:
if k != args.tocheck:
if k not in args.tocheck:
del prddict[k]
if not prddict:
prddict[args.tocheck] = []
for toch in args.tocheck:
if toch not in prddict.keys():
prddict[toch] = []
dev = DesktopDevice()
runner = RequestRunner(ServerVoteSelector(), https=False)
runner.max_tries = 20
if args.key2mode:
prefix = "APBI-PRD"
suffix = ""
else:
prefix = "" if args.noprefix else "PRD-"
suffix = "-"
for center in sorted(prddict.keys()):
tails = [int(i) for i in prddict[center]]
safes = [g for g in range(floor, ceiling) if g not in tails]
@@ -65,7 +76,7 @@ for center in sorted(prddict.keys()):
done_count = 0
print("Checking {} variant codes for model {}.".format(total_count, center))
for j in safes:
curef = "PRD-{}-{:03}".format(center, j)
curef = "{}{}{}{:03}".format(prefix, center, suffix, j)
done_count += 1
print("Checking {} ({}/{})".format(curef, done_count, total_count))
print(ansi.UP_DEL, end="")
+8 -4
View File
@@ -9,8 +9,8 @@ import sys
from tcllib import ansi, argparser, devlist
from tcllib.devices import DesktopDevice
from tcllib.requests import RequestRunner, CheckRequest, ServerVoteSelector, \
write_info_if_dumps_found
from tcllib.dumpmgr import write_info_if_dumps_found
from tcllib.requests import CheckRequest, RequestRunner, ServerVoteSelector
# Variants to scan for
@@ -24,6 +24,7 @@ dp = argparser.DefaultParser(__file__, dpdesc)
dp.add_argument("floor", nargs="?", help="Model number to start with", type=int, default=63116)
dp.add_argument("ceiling", nargs="?", help="Model number to end with", type=int, default=99999)
dp.add_argument("-l", "--local", help="Force using local database", dest="local", action="store_true", default=False)
dp.add_argument("-k2", "--key2", help="V2 syntax", dest="key2mode", action="store_true", default=False)
args = dp.parse_args(sys.argv[1:])
floor = args.floor
@@ -38,7 +39,7 @@ print(" OK")
print("Valid PRDs not already in database:")
known_centers = set(int(x.replace("PRD-", "").split("-")[0]) for x in prd_db)
known_centers = set(int(x.replace("PRD-", "").replace("APBI-PRD", "")[0:5]) for x in prd_db)
scan_list = set(range(floor, ceiling))
to_scan = scan_list - known_centers
@@ -52,7 +53,10 @@ runner.max_tries = 20
for center in to_scan:
for j in SCAN_VARIANTS:
curef = "PRD-{:05}-{:3}".format(center, j)
if args.key2mode:
curef = "APBI-PRD{:05}{:3}".format(center, j)
else:
curef = "PRD-{:05}-{:3}".format(center, j)
done_count += 1
print("Checking {} ({}/{})".format(curef, done_count, total_count))
print(ansi.UP_DEL, end="")
+3 -3
View File
@@ -7,10 +7,10 @@
import sys
from tcllib import ansi, argparser, devlist
from tcllib import ansi, argparser
from tcllib.devices import MobileDevice
from tcllib.requests import RequestRunner, CheckRequest, ServerVoteSelector, \
write_info_if_dumps_found
from tcllib.dumpmgr import write_info_if_dumps_found
from tcllib.requests import CheckRequest, RequestRunner, ServerVoteSelector
dpdesc = """
+22 -5
View File
@@ -6,12 +6,22 @@
"""Query existence of missing OTAs."""
import json
import sys
import requests
from tcllib import ansi
from tcllib import argparser
from tcllib.devices import MobileDevice
from tcllib.requests import RequestRunner, CheckRequest, ServerVoteSelector, \
write_info_if_dumps_found
from tcllib.dumpmgr import write_info_if_dumps_found
from tcllib.requests import CheckRequest, RequestRunner, ServerVoteSelector
dpdesc = """
Queries the database server for known versions and tries to find OTA files not yet in the database.
"""
dp = argparser.DefaultParser(__file__, dpdesc)
args = dp.parse_args(sys.argv[1:])
del args
# 1. Fetch list of missing OTAs (e.g. from ancient versions to current)
@@ -43,8 +53,15 @@ for prd, data in versions.items():
chk = CheckRequest(dev)
runner.run(chk)
if chk.success:
print("", end="", flush=True)
num_item += 1
if chk.result.tvver == data["latest_ota"]:
print("", end="", flush=True)
num_item += 1
elif chk.result.tvver in data["update_map"] and ver in data["update_map"][chk.result.tvver]:
# Delete dump as we already know the information
chk.result.delete_dump()
print("%", end="", flush=True)
else:
print("~", end="", flush=True)
else:
print("", end="", flush=True)
print("")
+5 -30
View File
@@ -1,35 +1,10 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0326,C0103
"""Library for TCL API work and related functions."""
import requests
from . import (dumpmgr, servervote, tclcheck, tclchecksum, tclencheader,
tclrequest)
class FotaCheck(
tclcheck.TclCheckMixin,
tclrequest.TclRequestMixin,
tclchecksum.TclChecksumMixin,
tclencheader.TclEncHeaderMixin,
servervote.ServerVoteMixin,
dumpmgr.DumpMgrMixin
):
"""Main API handler class."""
def __init__(self):
"""Handle mixins and populate variables."""
super().__init__()
self.reset_session()
def reset_session(self, device=None):
"""Reset everything to default."""
self.g2master = self.get_master_server()
self.sess = requests.Session()
if device:
self.sess.headers.update({"User-Agent": device.ua})
return self.sess
from .ansi import *
from .argparser import *
from .devices import *
from .devlist import *
from .dumpmgr import *
+3 -2
View File
@@ -1,3 +1,4 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Pseudo-devices for desktop/mobile requests"""
@@ -27,7 +28,7 @@ class Device():
self.chnl = self.CHNL_STATES["WIFI"]
self.cktp = self.CKTP_STATES["MANUAL"]
self.ckot = self.CKOT_STATES["ALL"]
self.ua = "tcl"
self.uagent = "tcl"
def is_rooted(self):
"""Get RTD as boolean."""
@@ -71,7 +72,7 @@ class MobileDevice(Device):
self.imei = "3531510"
self.set_cltp("MOBILE")
self.set_mode("OTA")
self.ua = "com.tcl.fota/5.1.0.2.0029.0, Android"
self.uagent = "com.tcl.fota/5.1.0.2.0029.0, Android"
class DesktopDevice(Device):
+38 -20
View File
@@ -19,7 +19,7 @@ DEVICELIST_FILE = "prds.json"
DEVICELIST_CACHE_SECONDS = 86400
def load_local_devicelist():
def _load_local_devicelist():
"""Load local devicelist and return decoded JSON (or None) and need_download status."""
need_download = True
try:
@@ -29,22 +29,18 @@ def load_local_devicelist():
need_download = False
with open(DEVICELIST_FILE, "rt") as dlfile:
return json.load(dlfile), need_download
except FileNotFoundError:
except (FileNotFoundError, json.decoder.JSONDecodeError):
return None, True
def get_devicelist(force=False, output_diff=True, local=False):
"""Return device list from saved database."""
old_prds, need_download = load_local_devicelist()
if local:
return old_prds
if need_download or force:
def _download_devicelist(doit: bool):
"""Download device list if doit is set. Or do nothing."""
if doit:
prds_json = requests.get(DEVICELIST_URL).text
with open(DEVICELIST_FILE, "wt") as dlfile:
dlfile.write(prds_json)
def _load_devicelist_with_diff(output_diff: bool, old_prds: dict = {}) -> dict:
"""Load local devicelist and output diff if requested."""
with open(DEVICELIST_FILE, "rt") as dlfile:
prds = json.load(dlfile)
@@ -53,8 +49,19 @@ def get_devicelist(force=False, output_diff=True, local=False):
return prds
def get_devicelist(force: bool=False, output_diff: bool=True, local: bool=False) -> dict:
"""Return device list from saved database."""
old_prds, need_download = _load_local_devicelist()
def print_versions_diff(old_data, new_data):
if local:
return old_prds
_download_devicelist(need_download or force)
return _load_devicelist_with_diff(output_diff, old_prds)
def print_versions_diff(old_data: dict, new_data: dict):
"""Print version changes between old and new databases."""
prd = new_data["curef"]
if new_data["last_full"] != old_data["last_full"] and new_data["last_ota"] != old_data["last_ota"]:
@@ -70,17 +77,28 @@ def print_versions_diff(old_data, new_data):
elif new_data["last_ota"] != old_data["last_ota"]:
print("> {}: {}{} (OTA)".format(prd, ansi.YELLOW_DARK + str(old_data["last_ota"]) + ansi.RESET, ansi.YELLOW + str(new_data["last_ota"]) + ansi.RESET))
def print_prd_diff(old_prds, new_prds):
"""Print PRD changes between old and new databases."""
added_prds = [prd for prd in new_prds if prd not in old_prds]
removed_prds = [prd for prd in old_prds if prd not in new_prds]
def _print_removed_prds(prds_data: dict, removed_prds: list):
"""Print details of selected PRDs as removed."""
for prd in removed_prds:
print("> Removed device {} (was at {} / OTA: {}).".format(ansi.RED + prd + ansi.RESET, old_prds[prd]["last_full"], old_prds[prd]["last_ota"]))
print("> Removed device {} (was at {} / OTA: {}).".format(ansi.RED + prd + ansi.RESET, prds_data[prd]["last_full"], prds_data[prd]["last_ota"]))
def _print_added_prds(prds_data: dict, added_prds: list):
"""Print details of selected PRDs as added."""
for prd in added_prds:
print("> New device {} ({} / OTA: {}).".format(ansi.GREEN + prd + ansi.RESET, new_prds[prd]["last_full"], new_prds[prd]["last_ota"]))
print("> New device {} ({} / OTA: {}).".format(ansi.GREEN + prd + ansi.RESET, prds_data[prd]["last_full"], prds_data[prd]["last_ota"]))
def _print_changed_prds(old_prds: dict, new_prds: dict, skip_prds: list):
"""Print details of changed PRDs."""
for prd, pdata in new_prds.items():
if prd in added_prds:
if prd in skip_prds:
continue
odata = old_prds[prd]
print_versions_diff(odata, pdata)
def print_prd_diff(old_prds: dict, new_prds: dict):
"""Print PRD changes between old and new databases."""
added_prds = [prd for prd in new_prds if prd not in old_prds]
removed_prds = [prd for prd in old_prds if prd not in new_prds]
_print_removed_prds(old_prds, removed_prds)
_print_added_prds(new_prds, added_prds)
_print_changed_prds(old_prds, new_prds, added_prds)
+21 -21
View File
@@ -15,23 +15,34 @@ from math import floor
from . import ansi
class DumpMgrMixin:
"""A mixin component for XML dump management."""
def get_timestamp_random():
"""Generate timestamp + random part to avoid collisions."""
millis = floor(time.time() * 1000)
tail = "{:06d}".format(random.randint(0, 999999))
return "{}_{}".format(str(millis), tail)
def write_info_if_dumps_found():
"""Notify user to upload dumps if present."""
# To disable this info, uncomment the following line.
# return
files = glob.glob(os.path.normpath("logs/*.xml"))
if files:
print()
print("{}There are {} logs collected in the logs/ directory.{} Please consider uploading".format(ansi.YELLOW, len(files), ansi.RESET))
print("them to https://tclota.birth-online.de/ by running {}./upload_logs.py{}.".format(ansi.CYAN, ansi.RESET))
class DumpMgr:
"""A class for XML dump management."""
def __init__(self):
"""Populate dump file name."""
self.last_dump_filename = None
@staticmethod
def get_timestamp_random():
"""Generate timestamp + random part to avoid collisions."""
millis = floor(time.time() * 1000)
tail = "{:06d}".format(random.randint(0, 999999))
return "{}_{}".format(str(millis), tail)
def write_dump(self, data):
"""Write dump to file."""
outfile = os.path.normpath("logs/{}.xml".format(self.get_timestamp_random()))
outfile = os.path.normpath("logs/{}.xml".format(get_timestamp_random()))
if not os.path.exists(os.path.dirname(outfile)):
try:
os.makedirs(os.path.dirname(outfile))
@@ -47,14 +58,3 @@ class DumpMgrMixin:
if self.last_dump_filename:
os.unlink(self.last_dump_filename)
self.last_dump_filename = None
@staticmethod
def write_info_if_dumps_found():
"""Notify user to upload dumps if present."""
# To disable this info, uncomment the following line.
# return
files = glob.glob(os.path.normpath("logs/*.xml"))
if files:
print()
print("{}There are {} logs collected in the logs/ directory.{} Please consider uploading".format(ansi.YELLOW, len(files), ansi.RESET))
print("them to https://tclota.birth-online.de/ by running {}./upload_logs.py{}.".format(ansi.CYAN, ansi.RESET))
+5 -2
View File
@@ -1,9 +1,12 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Library for generic TCL API requests."""
from .checkrequest import CheckRequest
from .downloadrequest import DownloadRequest
from .checknewrequest import CheckNewRequest
from .checksumrequest import ChecksumRequest
from .downloadrequest import DownloadRequest
from .encryptheaderrequest import EncryptHeaderRequest
from .runner import *
from .serverselector import *
from .dumpmgr import write_info_if_dumps_found
+104
View File
@@ -0,0 +1,104 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Generic update check request."""
import binascii
import hashlib
import random
import time
import zlib
from collections import OrderedDict
from math import floor
from .. import devices
from .tclrequest import TclRequest
from .tclresult import CheckResult
VDKEY = "010010000110111101110111001000000110000101110010011001010010000001111001011011110111010100100000011001110110010101110100001000000111010001101000011010010111001100100000011010110110010101111001001000000111011101101111011100100110010000111111"
VDKEY_B64Z = b"eJwdjwEOwDAIAr8kKFr//7HhmqXp8AIIDrYAgg8byiUXrwRJRXja+d6iNxu0AhUooDCN9rd6rDLxmGIakUVWo3IGCTRWqCAt6X4jGEIUAxgN0eYWnp+LkpHQAg/PsO90ELsy0Npm/n2HbtPndFgGEV31R9OmT4O4nrddjc3Qt6nWscx7e+WRHq5UnOudtjw5skuV09pFhvmqnOEIs4ljPeel1wfLYUF4\n"
def get_salt():
"""Generate cryptographic salt."""
millis = floor(time.time() * 1000)
tail = "{:06d}".format(random.randint(0, 999999))
return "{}{}".format(millis, tail)
def get_vk2(params_dict, cltp):
"""Generate salted hash of API parameters."""
#params_dict["cltp"] = cltp
query = ""
for key, val in params_dict.items():
if query:
query += "&"
query += key + "=" + str(val)
#vdk = zlib.decompress(binascii.a2b_base64(VDKEY_B64Z))
#query += vdk.decode("utf-8")
query += VDKEY
engine = hashlib.sha1()
engine.update(bytes(query, "utf-8"))
hexhash = engine.hexdigest()
return hexhash
class CheckNewRequest(TclRequest):
"""Generic update check request."""
def __init__(self, device: devices.Device):
"""Populate variables.."""
super().__init__()
self.uri = "/check_new.php"
self.method = "GET"
self.device = device
def get_headers(self):
"""Return request headers."""
return {"User-Agent": "GOTU Client v10.1.1"}
def get_params(self):
"""Return request parameters."""
params = OrderedDict()
params["id"] = self.device.imei
params["salt"] = get_salt()
params["curef"] = self.device.curef
params["fv"] = self.device.fwver
params["type"] = self.device.type
params["mode"] = self.device.mode
params["cltp"] = self.device.cltp
params["vk"] = get_vk2(params, self.device.cltp)
#params["cktp"] = self.device.cktp
#params["rtd"] = self.device.rtd
#params["chnl"] = self.device.chnl
#params["osvs"] = self.device.osvs
#params["ckot"] = self.device.ckot
print(repr(params))
return params
def is_done(self, http_status: int, contents: str) -> bool:
"""Handle request result."""
ok_states = {
204: "No update available.",
404: "No data for requested CUREF/FV combination.",
}
if http_status == 200:
self.response = contents
self.result = CheckResult(contents)
self.success = True
return True
elif http_status in ok_states:
self.error = ok_states[http_status]
self.success = False
return True
elif http_status not in [500, 502, 503]:
# Errors OTHER than 500, 502 or 503 are probably
# errors where we don't need to retry
self.error = "HTTP {}.".format(http_status)
self.success = False
return True
return False
# Check requests have 4 possible outcomes:
# 1. HTTP 200 with XML data - our desired info
# 2. HTTP 204 - means: no newer update available
# 3. HTTP 404 - means: invalid device or firmware version
# 4. anything else: server problem (esp. 500, 502, 503)
+14 -3
View File
@@ -1,21 +1,31 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Generic update check request."""
from collections import OrderedDict
from .. import devices
from .tclrequest import TclRequest
from .tclresult import CheckResult
class CheckRequest(TclRequest):
"""Generic update check request."""
def __init__(self, device: devices.Device):
"""Populate variables.."""
super().__init__()
self.uri = "/check.php"
self.method = "GET"
self.device = device
def get_headers(self):
return {"User-Agent": self.device.ua}
"""Return request headers."""
return {"User-Agent": self.device.uagent}
def get_params(self):
"""Return request parameters."""
params = OrderedDict()
params["id"] = self.device.imei
params["curef"] = self.device.curef
@@ -31,6 +41,7 @@ class CheckRequest(TclRequest):
return params
def is_done(self, http_status: int, contents: str) -> bool:
"""Handle request result."""
ok_states = {
204: "No update available.",
404: "No data for requested CUREF/FV combination.",
@@ -47,7 +58,7 @@ class CheckRequest(TclRequest):
elif http_status not in [500, 502, 503]:
# Errors OTHER than 500, 502 or 503 are probably
# errors where we don't need to retry
self.error ="HTTP {}.".format(http_status)
self.error = "HTTP {}.".format(http_status)
self.success = False
return True
return False
+13 -2
View File
@@ -1,13 +1,21 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from collections import OrderedDict
"""Generic file checksum request."""
import json
from .. import credentials, devices
from collections import OrderedDict
from .. import credentials
from .tclrequest import TclRequest
from .tclresult import ChecksumResult
class ChecksumRequest(TclRequest):
"""Generic file checksum request."""
def __init__(self, address, file_uri):
"""Populate variables."""
super().__init__()
# NOTE: THIS HAS TO BE RUN ON AN ENCSLAVE
self.uri = "/checksum.php"
@@ -16,9 +24,11 @@ class ChecksumRequest(TclRequest):
self.file_uri = file_uri
def get_headers(self):
"""Return request headers."""
return {"User-Agent": "tcl"}
def get_params(self):
"""Return request parameters."""
params = OrderedDict()
params.update(credentials.get_creds2())
payload = {self.address: self.file_uri}
@@ -27,6 +37,7 @@ class ChecksumRequest(TclRequest):
return params
def is_done(self, http_status: int, contents: str) -> bool:
"""Handle request result."""
if http_status == 200:
# <ENCRYPT_FOOTER>2abfa6f6507044fec995efede5d818e62a0b19b5</ENCRYPT_FOOTER> means ERROR (invalid ADDRESS!)
if "<ENCRYPT_FOOTER>2abfa6f6507044fec995efede5d818e62a0b19b5</ENCRYPT_FOOTER>" in contents:
+13 -1
View File
@@ -1,5 +1,8 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Generic file download request."""
import binascii
import hashlib
import random
@@ -7,6 +10,7 @@ import time
import zlib
from collections import OrderedDict
from math import floor
from .. import devices
from .tclrequest import TclRequest
from .tclresult import DownloadResult
@@ -21,6 +25,7 @@ def get_salt():
tail = "{:06d}".format(random.randint(0, 999999))
return "{}{}".format(str(millis), tail)
def get_vk2(params_dict, cltp):
"""Generate salted hash of API parameters."""
params_dict["cltp"] = cltp
@@ -36,8 +41,12 @@ def get_vk2(params_dict, cltp):
hexhash = engine.hexdigest()
return hexhash
class DownloadRequest(TclRequest):
"""Generic file download request."""
def __init__(self, device: devices.Device, tvver: str, fw_id: str):
"""Populate variables."""
super().__init__()
self.uri = "/download_request.php"
self.method = "POST"
@@ -46,9 +55,11 @@ class DownloadRequest(TclRequest):
self.fw_id = fw_id
def get_headers(self):
return {"User-Agent": self.device.ua}
"""Return request headers."""
return {"User-Agent": self.device.uagent}
def get_params(self):
"""Return request parameters."""
params = OrderedDict()
params["id"] = self.device.imei
params["salt"] = get_salt()
@@ -68,6 +79,7 @@ class DownloadRequest(TclRequest):
return params
def is_done(self, http_status: int, contents: str) -> bool:
"""Handle request result."""
if http_status == 200:
self.response = contents
self.result = DownloadResult(contents)
-58
View File
@@ -1,58 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0326,C0103
"""Tools to manage dumps of API requests."""
import errno
import glob
import os
import random
import time
from math import floor
from .. import ansi
def get_timestamp_random():
"""Generate timestamp + random part to avoid collisions."""
millis = floor(time.time() * 1000)
tail = "{:06d}".format(random.randint(0, 999999))
return "{}_{}".format(str(millis), tail)
def write_info_if_dumps_found():
"""Notify user to upload dumps if present."""
# To disable this info, uncomment the following line.
# return
files = glob.glob(os.path.normpath("logs/*.xml"))
if files:
print()
print("{}There are {} logs collected in the logs/ directory.{} Please consider uploading".format(ansi.YELLOW, len(files), ansi.RESET))
print("them to https://tclota.birth-online.de/ by running {}./upload_logs.py{}.".format(ansi.CYAN, ansi.RESET))
class DumpMgr:
"""A class for XML dump management."""
def __init__(self):
"""Populate dump file name."""
self.last_dump_filename = None
def write_dump(self, data):
"""Write dump to file."""
outfile = os.path.normpath("logs/{}.xml".format(get_timestamp_random()))
if not os.path.exists(os.path.dirname(outfile)):
try:
os.makedirs(os.path.dirname(outfile))
except OSError as err:
if err.errno != errno.EEXIST:
raise
with open(outfile, "w", encoding="utf-8") as fhandle:
fhandle.write(data)
self.last_dump_filename = outfile
def delete_last_dump(self):
"""Delete last dump."""
if self.last_dump_filename:
os.unlink(self.last_dump_filename)
self.last_dump_filename = None
+12 -1
View File
@@ -1,12 +1,20 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Generic encrypted header download request."""
from collections import OrderedDict
from .. import credentials, devices
from .. import credentials
from .tclrequest import TclRequest
from .tclresult import EncryptHeaderResult
class EncryptHeaderRequest(TclRequest):
"""Generic encrypted header download request."""
def __init__(self, file_uri):
"""Populate variables."""
super().__init__()
# NOTE: THIS HAS TO BE RUN ON AN ENCSLAVE
self.uri = "/encrypt_header.php"
@@ -15,15 +23,18 @@ class EncryptHeaderRequest(TclRequest):
self.file_uri = file_uri
def get_headers(self):
"""Return request headers."""
return {"User-Agent": "tcl"}
def get_params(self):
"""Return request parameters."""
params = OrderedDict()
params.update(credentials.get_creds2())
params["address"] = bytes(self.file_uri, "utf-8")
return params
def is_done(self, http_status: int, contents: str) -> bool:
"""Handle request result."""
# Expect "HTTP 206 Partial Content" response
if http_status == 206:
self.result = EncryptHeaderResult(contents)
+15 -5
View File
@@ -1,13 +1,21 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import requests
"""Base HTTP requests."""
from collections import OrderedDict
import requests
class TimeoutException(Exception):
"""Ignore timeouts."""
pass
class HttpRequest:
"""Provides all generic features for making HTTP GET requests"""
def __init__(self, url, timeout=10):
self.url = url
self.params = OrderedDict()
@@ -23,16 +31,18 @@ class HttpRequest:
"""Run query."""
try:
req = self.sess.get(self.url, params=self.params, timeout=self.timeout)
except requests.exceptions.Timeout as e:
raise TimeoutException(e)
except requests.exceptions.Timeout as exc:
raise TimeoutException(exc)
return req
class HttpPostRequest(HttpRequest):
"""Provides all generic features for making HTTP POST requests"""
def run(self):
"""Run query."""
try:
req = self.sess.post(self.url, data=self.params, timeout=self.timeout)
except requests.exceptions.Timeout as e:
raise TimeoutException(e)
except requests.exceptions.Timeout as exc:
raise TimeoutException(exc)
return req
+13 -5
View File
@@ -1,20 +1,28 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Generic request executors."""
from . import http, serverselector
from .tclrequest import TclRequest
from . import http
from . import serverselector
class UnknownMethodException(Exception):
"""Ignore unknown methods."""
pass
class RequestRunner:
"""Generic request executor."""
def __init__(self, server_selector: serverselector.ServerSelector, https=True):
"""Populate variables."""
self.server_selector = server_selector
self.protocol = "https://" if https else "http://"
self.max_tries = 5
def get_http(self, method="GET") -> http.HttpRequest:
"""Returns the http class according to desired method."""
"""Return the http class according to desired method."""
if method == "GET":
return http.HttpRequest
elif method == "POST":
@@ -22,11 +30,11 @@ class RequestRunner:
raise UnknownMethodException("Unknown http method: {}".format(method))
def get_server(self) -> str:
"""Returns a master server."""
"""Return a master server."""
return self.server_selector.get_master_server()
def run(self, query: TclRequest, timeout: int=10) -> bool:
"""Runs the actual query."""
"""Run the actual query."""
for _ in range(0, self.max_tries):
url = "{}{}{}".format(self.protocol, self.get_server(), query.uri)
http_handler = self.get_http(query.method)(url, timeout)
+5 -1
View File
@@ -1,12 +1,14 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0326,C0103
"""Tools to sort API servers to find the least awful one."""
import numpy
import time
import numpy
MASTER_SERVERS = [
"g2master-us-east.tclclouds.com",
@@ -17,6 +19,7 @@ MASTER_SERVERS = [
"g2master-sa-east.tclclouds.com",
]
class ServerSelector:
"""Returns a random server to use."""
@@ -45,6 +48,7 @@ class ServerSelector:
"""Hook to be called after request finished"""
pass
class ServerVoteSelector(ServerSelector):
"""Tries to return faster servers more often."""
+11 -2
View File
@@ -1,9 +1,16 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Generic TCL request object."""
from . import tclresult
class TclRequest:
"""Generic TCL request object."""
def __init__(self):
"""Populate variables."""
self.uri = ""
self.rawmode = False
self.response = None
@@ -12,15 +19,17 @@ class TclRequest:
self.success = False
def get_headers(self):
"""Return request headers."""
return {}
def get_params(self):
"""Return request parameters."""
return {}
def is_done(self, http_status: int, contents: str):
"""Checks if query is done or needs retry."""
"""Check if query is done or needs retry."""
return False
def get_result(self) -> tclresult.TclResult:
"""Returns Result object."""
"""Return Result object."""
return self.result
+36 -13
View File
@@ -1,19 +1,26 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Generic TCL API result handlers."""
import xml.dom.minidom
from defusedxml import ElementTree
from . import dumpmgr
from .. import dumpmgr
class TclResult:
def __init__(self, xml: str):
self.raw_xml = xml
"""Generic TCL API result."""
def __init__(self, xmlstr: str):
"""Populate variables."""
self.raw_xml = xmlstr
self.dumper = dumpmgr.DumpMgr()
self.dumper.write_dump(xml)
self.dumper.write_dump(xmlstr)
def delete_dump(self):
"""Delete last dump."""
self.dumper.delete_last_dump()
def pretty_xml(self):
@@ -21,10 +28,14 @@ class TclResult:
mdx = xml.dom.minidom.parseString(self.raw_xml)
return mdx.toprettyxml(indent=" ")
class CheckResult(TclResult):
def __init__(self, xml: str):
super().__init__(xml)
root = ElementTree.fromstring(xml)
"""Handle check request result."""
def __init__(self, xmlstr: str):
"""Extract data from check request result."""
super().__init__(xmlstr)
root = ElementTree.fromstring(xmlstr)
self.curef = root.find("CUREF").text
self.fvver = root.find("VERSION").find("FV").text
self.tvver = root.find("VERSION").find("TV").text
@@ -35,10 +46,14 @@ class CheckResult(TclResult):
self.filesize = fileinfo.find("SIZE").text
self.filehash = fileinfo.find("CHECKSUM").text
class DownloadResult(TclResult):
def __init__(self, xml: str):
super().__init__(xml)
root = ElementTree.fromstring(xml)
"""Handle download request result."""
def __init__(self, xmlstr: str):
"""Extract data from download request result."""
super().__init__(xmlstr)
root = ElementTree.fromstring(xmlstr)
file = root.find("FILE_LIST").find("FILE")
self.fileid = file.find("FILE_ID").text
self.fileurl = file.find("DOWNLOAD_URL").text
@@ -53,16 +68,24 @@ class DownloadResult(TclResult):
self.encslaves = [s.text for s in enc_list]
self.s3_slaves = [s.text for s in s3_slave_list]
class ChecksumResult(TclResult):
def __init__(self, xml: str):
super().__init__(xml)
root = ElementTree.fromstring(xml)
"""Handle checksum request result."""
def __init__(self, xmlstr: str):
"""Extract data from checksum request result."""
super().__init__(xmlstr)
root = ElementTree.fromstring(xmlstr)
file = root.find("FILE_CHECKSUM_LIST").find("FILE")
self.file_addr = file.find("ADDRESS").text
self.sha1_enc_footer = file.find("ENCRYPT_FOOTER").text
self.sha1_footer = file.find("FOOTER").text
self.sha1_body = file.find("BODY").text
class EncryptHeaderResult(TclResult):
"""Handle encrypted header request result."""
def __init__(self, contents: str):
"""Extract data from encrypted header request result."""
self.rawdata = contents
-66
View File
@@ -1,66 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0326,C0103
"""Tools to sort API servers to find the least awful one."""
import numpy
class ServerVoteMixin:
"""A mixin component for server sorting."""
def __init__(self):
"""Populate server list and weighting variables."""
self.g2master = None
self.master_servers = [
"g2master-us-east.tclclouds.com",
"g2master-us-west.tclclouds.com",
"g2master-eu-west.tclclouds.com",
"g2master-ap-south.tclclouds.com",
"g2master-ap-north.tclclouds.com",
"g2master-sa-east.tclclouds.com",
]
self.master_servers_weights = [3] * len(self.master_servers)
self.check_time_sum = 3
self.check_time_count = 1
def get_master_server(self):
"""Return weighted choice from server list."""
weight_sum = 0
for i in self.master_servers_weights:
weight_sum += i
numpy_weights = []
for i in self.master_servers_weights:
numpy_weights.append(i/weight_sum)
return numpy.random.choice(self.master_servers, p=numpy_weights)
def master_server_downvote(self):
"""Decrease weight of a server."""
idx = self.master_servers.index(self.g2master)
if self.master_servers_weights[idx] > 1:
self.master_servers_weights[idx] -= 1
def master_server_upvote(self):
"""Increase weight of a server."""
idx = self.master_servers.index(self.g2master)
if self.master_servers_weights[idx] < 10:
self.master_servers_weights[idx] += 1
def check_time_add(self, duration):
"""Record connection time."""
self.check_time_sum += duration
self.check_time_count += 1
def check_time_avg(self):
"""Return average connection time."""
return self.check_time_sum / self.check_time_count
def master_server_vote_on_time(self, last_duration):
"""Change weight of a server based on average connection time."""
avg_duration = self.check_time_avg()
if last_duration < avg_duration - 0.5:
self.master_server_upvote()
elif last_duration > avg_duration + 0.5:
self.master_server_downvote()
-94
View File
@@ -1,94 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0326,C0103
"""Tools to interface with TCL's update request API."""
import time
from collections import OrderedDict, defaultdict
import requests
from defusedxml import ElementTree
from .devices import Device
class TclCheckMixin:
"""A mixin component for TCL's update request API."""
def prep_check_url(self, https=True):
"""Prepare URL for update request."""
protocol = "https://" if https else "http://"
url = protocol + self.g2master + "/check.php"
return url
def prep_check(self, device: Device, https=True):
"""Prepare URL and parameters for update request."""
url = self.prep_check_url(https)
params = OrderedDict()
params["id"] = device.imei
params["curef"] = device.curef
params["fv"] = device.fwver
params["mode"] = device.mode
params["type"] = device.type
params["cltp"] = device.cltp
params["cktp"] = device.cktp
params["rtd"] = device.rtd
params["chnl"] = device.chnl
#params["osvs"] = device.osvs
#params["ckot"] = device.ckot
return url, params
def do_check(self, device: Device, https=True, timeout=10, max_tries=5):
"""Perform update request with given parameters."""
url, params = self.prep_check(device, https)
last_response = None
for _ in range(0, max_tries):
try:
reqtime_start = time.perf_counter()
req = self.sess.get(url, params=params, timeout=timeout)
reqtime = time.perf_counter() - reqtime_start
self.check_time_add(reqtime)
last_response = req
if req.status_code == 200:
self.master_server_vote_on_time(reqtime)
req.encoding = "utf-8" # Force encoding as server doesn't give one
self.write_dump(req.text)
return req.text
elif req.status_code not in [500, 502, 503]:
self.do_check_errorhandle(req, reqtime)
except requests.exceptions.Timeout:
pass
# Something went wrong, try a different server
self.master_server_downvote()
self.g2master = self.get_master_server()
url = self.prep_check_url(https)
raise requests.exceptions.RetryError("Max tries ({}) reached.".format(max_tries), response=last_response)
def do_check_errorhandle(self, req, reqtime):
"""Handle non-HTTP 200 results for ``do_check``."""
errcodes = defaultdict(lambda: "HTTP {}.".format(req.status_code))
errcodes[204] = "No update available."
errcodes[404] = "No data for requested CUREF/FV combination."
if req.status_code in [204, 404]:
self.master_server_vote_on_time(reqtime)
elif req.status_code not in [500, 502, 503]:
self.master_server_downvote()
req.raise_for_status()
raise requests.exceptions.HTTPError(errcodes[req.status_code], response=req)
@staticmethod
def parse_check(xmlstr):
"""Parse output of ``do_check``."""
root = ElementTree.fromstring(xmlstr)
curef = root.find("CUREF").text
fvver = root.find("VERSION").find("FV").text
tvver = root.find("VERSION").find("TV").text
fw_id = root.find("FIRMWARE").find("FW_ID").text
fileinfo = root.find("FIRMWARE").find("FILESET").find("FILE")
fileid = fileinfo.find("FILE_ID").text
filename = fileinfo.find("FILENAME").text
filesize = fileinfo.find("SIZE").text
filehash = fileinfo.find("CHECKSUM").text
return curef, fvver, tvver, fw_id, fileid, filename, filesize, filehash
-56
View File
@@ -1,56 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0326,C0103
"""Tools to interface with TCL's checksum API."""
import json
from defusedxml import ElementTree
from . import credentials
class TclChecksumMixin:
"""A mixin component for TCL's checksum API."""
@staticmethod
def prep_checksum(encslave, address, uri):
"""Prepare URL and parameters for checksum request."""
url = "http://" + encslave + "/checksum.php"
params = credentials.get_creds2()
payload = {address: uri}
payload_json = json.dumps(payload)
params[b"address"] = bytes(payload_json, "utf-8")
return url, params
def do_checksum(self, encslave, address, uri):
"""Perform checksum request with given parameters."""
url, params = self.prep_checksum(encslave, address, uri)
# print(repr(dict(params)))
req = self.sess.post(url, data=params)
if req.status_code == 200:
req.encoding = "utf-8" # Force encoding as server doesn't give one
self.write_dump(req.text)
# <ENCRYPT_FOOTER>2abfa6f6507044fec995efede5d818e62a0b19b5</ENCRYPT_FOOTER> means ERROR (invalid ADDRESS!)
if "<ENCRYPT_FOOTER>2abfa6f6507044fec995efede5d818e62a0b19b5</ENCRYPT_FOOTER>" in req.text:
print("INVALID URI: {}".format(uri))
raise SystemExit
return req.text
else:
print("CHECKSUM: " + repr(req))
print(repr(req.headers))
print(repr(req.text))
raise SystemExit
@staticmethod
def parse_checksum(xmlstr):
"""Parse output of ``do_checksum``."""
root = ElementTree.fromstring(xmlstr)
file = root.find("FILE_CHECKSUM_LIST").find("FILE")
file_addr = file.find("ADDRESS").text
sha1_enc_footer = file.find("ENCRYPT_FOOTER").text
sha1_footer = file.find("FOOTER").text
sha1_body = file.find("BODY").text
return file_addr, sha1_body, sha1_enc_footer, sha1_footer
-27
View File
@@ -1,27 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0326,C0103
"""Tools to interface with TCL's encrypted header API."""
from . import credentials
class TclEncHeaderMixin:
"""A mixin component for TCL's encrypted header API.."""
def do_encrypt_header(self, encslave, address):
"""Perform encrypted header request with given parameters."""
params = credentials.get_creds2()
params[b"address"] = bytes(address, "utf-8")
url = "http://" + encslave + "/encrypt_header.php"
req = self.sess.post(url, data=params, verify=False)
# Expect "HTTP 206 Partial Content" response
if req.status_code == 206:
return req.content
else:
print("ENCRYPT: " + repr(req))
print(repr(req.headers))
print(repr(req.text))
raise SystemExit
-123
View File
@@ -1,123 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0326,C0103
"""Tools to interface with TCL's download request API."""
import binascii
import hashlib
import random
import time
import zlib
from collections import OrderedDict
from math import floor
from defusedxml import ElementTree
'''
private HashMap<String, String> buildDownloadUrisParams(UpdatePackageInfo updatePackageInfo) {
FotaLog.m28v(TAG, "doAfterCheck");
String salt = FotaUtil.salt();
HashMap linkedHashMap = new LinkedHashMap();
linkedHashMap.put("id", this.internalBuilder.getParam("id"));
linkedHashMap.put("salt", salt);
linkedHashMap.put("curef", updatePackageInfo.mCuref);
linkedHashMap.put("fv", updatePackageInfo.mFv);
linkedHashMap.put("tv", updatePackageInfo.mTv);
linkedHashMap.put("type", "Firmware");
linkedHashMap.put("fw_id", updatePackageInfo.mFirmwareId);
linkedHashMap.put("mode", "2");
linkedHashMap.put("vk", generateVk2((LinkedHashMap) linkedHashMap.clone()));
linkedHashMap.put("cltp", "10");
linkedHashMap.put("cktp", this.internalBuilder.getParam("cktp"));
linkedHashMap.put("rtd", this.internalBuilder.getParam("rtd"));
linkedHashMap.put("chnl", this.internalBuilder.getParam("chnl"));
return linkedHashMap;
}
'''
VDKEY_B64Z = b"eJwdjwEOwDAIAr8kKFr//7HhmqXp8AIIDrYAgg8byiUXrwRJRXja+d6iNxu0AhUooDCN9rd6rDLxmGIakUVWo3IGCTRWqCAt6X4jGEIUAxgN0eYWnp+LkpHQAg/PsO90ELsy0Npm/n2HbtPndFgGEV31R9OmT4O4nrddjc3Qt6nWscx7e+WRHq5UnOudtjw5skuV09pFhvmqnOEIs4ljPeel1wfLYUF4\n"
def get_salt():
"""Generate cryptographic salt."""
millis = floor(time.time() * 1000)
tail = "{:06d}".format(random.randint(0, 999999))
return "{}{}".format(str(millis), tail)
def get_vk2(params_dict, cltp):
"""Generate salted hash of API parameters."""
params_dict["cltp"] = cltp
query = ""
for key, val in params_dict.items():
if query:
query += "&"
query += key + "=" + str(val)
vdk = zlib.decompress(binascii.a2b_base64(VDKEY_B64Z))
query += vdk.decode("utf-8")
engine = hashlib.sha1()
engine.update(bytes(query, "utf-8"))
hexhash = engine.hexdigest()
return hexhash
class TclRequestMixin:
"""A mixin component for TCL's download request API."""
def prep_request(self, curef, fvver, tvver, fw_id):
"""Prepare URL and device parameters for download request."""
url = "https://" + self.g2master + "/download_request.php"
params = OrderedDict()
params["id"] = self.serid
params["salt"] = get_salt()
params["curef"] = curef
params["fv"] = fvver
params["tv"] = tvver
params["type"] = self.ftype
params["fw_id"] = fw_id
params["mode"] = self.mode.value
params["vk"] = get_vk2(params, self.cltp.value)
params["cltp"] = self.cltp.value
params["cktp"] = self.cktp.value
params["rtd"] = self.rtd.value
if self.mode == self.MODE.FULL:
params["foot"] = 1
params["chnl"] = self.chnl.value
return url, params
def do_request(self, curef, fvver, tvver, fw_id):
"""Perform download request with given parameters."""
url, params = self.prep_request(curef, fvver, tvver, fw_id)
# print(repr(dict(params)))
req = self.sess.post(url, data=params)
if req.status_code == 200:
req.encoding = "utf-8" # Force encoding as server doesn't give one
self.write_dump(req.text)
return req.text
else:
print("REQUEST: " + repr(req))
print(repr(req.headers))
print(repr(req.text))
raise SystemExit
@staticmethod
def parse_request(xmlstr):
"""Parse output of ``do_request``."""
root = ElementTree.fromstring(xmlstr)
file = root.find("FILE_LIST").find("FILE")
fileid = file.find("FILE_ID").text
fileurl = file.find("DOWNLOAD_URL").text
s3_fileurl_node = file.find("S3_DOWNLOAD_URL")
s3_fileurl = ""
if s3_fileurl_node:
s3_fileurl = s3_fileurl_node.text
slave_list = root.find("SLAVE_LIST").findall("SLAVE")
enc_list = root.find("SLAVE_LIST").findall("ENCRYPT_SLAVE")
s3_slave_list = root.find("SLAVE_LIST").findall("S3_SLAVE")
slaves = [s.text for s in slave_list]
encslaves = [s.text for s in enc_list]
s3_slaves = [s.text for s in s3_slave_list]
return fileid, fileurl, slaves, encslaves, s3_fileurl, s3_slaves
-14
View File
@@ -1,14 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0326,C0103
"""XML tools."""
import xml.dom.minidom
def pretty_xml(xmlstr):
"""Prettify input XML with ``xml.dom.minidom``."""
mdx = xml.dom.minidom.parseString(xmlstr)
return mdx.toprettyxml(indent=" ")
Executable
+22
View File
@@ -0,0 +1,22 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0326,C0103
"""Update PRD database."""
import sys
from tcllib import argparser, devlist
dpdesc = """
Updates PRD software database if local copy is outdated.
"""
dp = argparser.DefaultParser(__file__, dpdesc)
dp.add_argument("-f", "--force", help="force database update", dest="force", action="store_true", default=False)
args = dp.parse_args(sys.argv[1:])
print("Updating device database...")
prds = devlist.get_devicelist(force=args.force)
del prds
Regular → Executable
+9
View File
@@ -9,9 +9,18 @@
import glob
import os
import sys
import requests
from tcllib import argparser
dpdesc = """
Uploads contents of logs folder to remote database.
"""
dp = argparser.DefaultParser(__file__, dpdesc)
args = dp.parse_args(sys.argv[1:])
del args
# This is the URL to an installation of https://github.com/mbirth/tcl_update_db
UPLOAD_URL = "https://tclota.birth-online.de/"
+17
View File
@@ -0,0 +1,17 @@
zhenghua.gao
qaruD4ok
SU_IN.Panasonic
KoD8naL2
TeleExtTest
t0523
teleserv
cGEek735
swupgrade
fwSwU338