Compare commits
28 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
caf4a6df52
|
|||
|
0436260abe
|
|||
|
5066499a4e
|
|||
|
d06feb6e4a
|
|||
|
6cbea7ed3a
|
|||
|
8780e60cb5
|
|||
| 375a87c4b8 | |||
| 4be09b5b3b | |||
| e4313a713f | |||
| d5b5c94f42 | |||
| 42a8c5653b | |||
| acf55d8e23 | |||
| 6866732133 | |||
|
8fb286b020
|
|||
|
7604fe2335
|
|||
| 5f0e334318 | |||
|
f3aae4ce7f
|
|||
|
995b451d11
|
|||
|
16d33b0cb6
|
|||
|
1707bcad26
|
|||
|
7d578b979b
|
|||
|
bfc7288f61
|
|||
| c29d32d39f | |||
| eef0f755e3 | |||
|
3059ff13f7
|
|||
|
75d23502a3
|
|||
|
bab58107fe
|
|||
|
e582642936
|
@@ -1,3 +1,8 @@
|
||||
[](https://scrutinizer-ci.com/g/mbirth/tcl_ota_check/?branch=master)
|
||||
[](https://scrutinizer-ci.com/g/mbirth/tcl_ota_check/build-status/master)
|
||||
[](https://scrutinizer-ci.com/code-intelligence)
|
||||
|
||||
|
||||
TCL OTA Check
|
||||
=============
|
||||
|
||||
@@ -75,22 +80,22 @@ Checks for the latest OTA (i.e. partial updates for over-the-air installation) v
|
||||
for all different models and variants.
|
||||
|
||||
|
||||
### tclcheck_findprd.py
|
||||
### tclfindprd.py
|
||||
|
||||
Scans for not yet known variants of a model.
|
||||
|
||||
|
||||
### tclcheck_findprd2.py
|
||||
### tclfindprd2.py
|
||||
|
||||
Scans for not yet known models.
|
||||
|
||||
|
||||
### tclcheck_findver.py
|
||||
### tclfindver.py
|
||||
|
||||
Scans for not yet known firmware versions.
|
||||
|
||||
|
||||
### tclcheck_gapfill.py
|
||||
### tclgapfill.py
|
||||
|
||||
Queries the [database server](https://tclota.birth-online.de/) for known versions and tries to find
|
||||
OTA files not yet in the database.
|
||||
@@ -106,6 +111,16 @@ Universal tool to query TCL's servers in different ways to manually check for a
|
||||
Queries the checksum for a specific FULL file.
|
||||
|
||||
|
||||
### tcldown.py
|
||||
|
||||
Downloads a firmware file from given file ID.
|
||||
|
||||
|
||||
### update_db.py
|
||||
|
||||
Updates local copy of database.
|
||||
|
||||
|
||||
### upload_logs.py
|
||||
|
||||
Uploads all collected server answers to the [database server](https://tclota.birth-online.de/).
|
||||
|
||||
+7
-4
@@ -6,14 +6,14 @@
|
||||
"""Checks for the latest FULL or OTA updates for specified PRD number."""
|
||||
|
||||
import os
|
||||
import random
|
||||
import sys
|
||||
|
||||
from tcllib import argparser
|
||||
from tcllib.devices import Device
|
||||
from tcllib.requests import RequestRunner, CheckRequest, DownloadRequest, \
|
||||
ChecksumRequest, EncryptHeaderRequest, ServerSelector, \
|
||||
write_info_if_dumps_found
|
||||
from tcllib.dumpmgr import write_info_if_dumps_found
|
||||
from tcllib.requests import (CheckRequest, ChecksumRequest, DownloadRequest,
|
||||
EncryptHeaderRequest, RequestRunner,
|
||||
ServerSelector)
|
||||
|
||||
|
||||
dpdesc = """
|
||||
@@ -33,6 +33,7 @@ args = dp.parse_args(sys.argv[1:])
|
||||
dev = Device(args.prd[0], args.fvver)
|
||||
dev.imei = "3531510"
|
||||
|
||||
|
||||
def sel_mode(txtmode, autoval, rawval):
|
||||
"""Handle custom mode."""
|
||||
if rawval:
|
||||
@@ -43,6 +44,7 @@ def sel_mode(txtmode, autoval, rawval):
|
||||
return dev.MODE_STATES["OTA"]
|
||||
return dev.MODE_STATES["FULL"]
|
||||
|
||||
|
||||
def sel_cltp(txtmode, autoval, rawval):
|
||||
"""Handle custom CLTP."""
|
||||
if rawval:
|
||||
@@ -53,6 +55,7 @@ def sel_cltp(txtmode, autoval, rawval):
|
||||
return dev.CLTP_STATES["DESKTOP"]
|
||||
return dev.CLTP_STATES["MOBILE"]
|
||||
|
||||
|
||||
if args.imei:
|
||||
print("Use specified IMEI: {}".format(args.imei))
|
||||
dev.imei = args.imei
|
||||
|
||||
Regular → Executable
+4
-3
@@ -9,7 +9,8 @@ import sys
|
||||
|
||||
from tcllib import ansi, argparser, devlist
|
||||
from tcllib.devices import DesktopDevice
|
||||
from tcllib.requests import RequestRunner, CheckRequest, ServerVoteSelector, write_info_if_dumps_found
|
||||
from tcllib.dumpmgr import write_info_if_dumps_found
|
||||
from tcllib.requests import CheckRequest, RequestRunner, ServerVoteSelector
|
||||
|
||||
|
||||
dev = DesktopDevice()
|
||||
@@ -47,8 +48,8 @@ for prd, variant in prds.items():
|
||||
if result.tvver != lastver:
|
||||
txt_tv = "{} (old: {} / OTA: {})".format(
|
||||
ansi.CYAN + txt_tv + ansi.RESET,
|
||||
ansi.CYAN_DARK + variant["last_full"] + ansi.RESET,
|
||||
variant["last_ota"]
|
||||
ansi.CYAN_DARK + str(variant["last_full"]) + ansi.RESET,
|
||||
str(variant["last_ota"])
|
||||
)
|
||||
else:
|
||||
result.delete_dump()
|
||||
|
||||
Regular → Executable
+4
-1
@@ -9,7 +9,8 @@ import sys
|
||||
|
||||
from tcllib import ansi, argparser, devlist
|
||||
from tcllib.devices import MobileDevice
|
||||
from tcllib.requests import RequestRunner, CheckRequest, ServerVoteSelector, write_info_if_dumps_found
|
||||
from tcllib.dumpmgr import write_info_if_dumps_found
|
||||
from tcllib.requests import CheckRequest, RequestRunner, ServerVoteSelector
|
||||
|
||||
|
||||
dev = MobileDevice()
|
||||
@@ -40,6 +41,8 @@ runner = RequestRunner(ServerVoteSelector())
|
||||
runner.max_tries = 20
|
||||
|
||||
for prd, variant in prds.items():
|
||||
if "PRD" not in prd:
|
||||
continue
|
||||
model = variant["variant"]
|
||||
lastver = variant["last_ota"]
|
||||
lastver = variant["last_full"] if lastver is None else lastver
|
||||
|
||||
@@ -0,0 +1,71 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# pylint: disable=C0111,C0326,C0103
|
||||
|
||||
"""Find all OTA updates for a given PRD."""
|
||||
|
||||
import sys
|
||||
|
||||
from tcllib import ansi, argparser
|
||||
from tcllib.devices import MobileDevice
|
||||
from tcllib.dumpmgr import write_info_if_dumps_found
|
||||
from tcllib.requests import CheckRequest, RequestRunner, ServerVoteSelector
|
||||
|
||||
|
||||
dpdesc = """
|
||||
Finds all valid OTA updates for a given PRD. Scan range can be set by
|
||||
startver and endver switches.
|
||||
"""
|
||||
dp = argparser.DefaultParser(__file__, dpdesc)
|
||||
dp.add_argument("prd", help="CU Reference #, e.g. PRD-63117-011")
|
||||
dp.add_argument("startver", help="Beginning of scan range", nargs="?", default="AAA000")
|
||||
dp.add_argument("endver", help="End of scan range", nargs="?", default="AAZ999")
|
||||
args = dp.parse_args(sys.argv[1:])
|
||||
|
||||
dev = MobileDevice()
|
||||
dev.curef = args.prd
|
||||
dev.mode = 3
|
||||
start_ver = args.startver
|
||||
end_ver = args.endver
|
||||
|
||||
print("Valid firmwares for model {} (between {} and {}):".format(dev.curef, start_ver, end_ver))
|
||||
|
||||
cur_ver = start_ver
|
||||
allvers = []
|
||||
while True:
|
||||
allvers.append(cur_ver)
|
||||
if cur_ver == end_ver:
|
||||
break
|
||||
letters = list(cur_ver[:3])
|
||||
num = int(cur_ver[3:6])
|
||||
num += 1
|
||||
if num > 999:
|
||||
num = 0
|
||||
for i in range(2, -1, -1):
|
||||
if letters[i] == "Z":
|
||||
letters[i] = "A"
|
||||
continue
|
||||
letters[i] = chr(ord(letters[i])+1)
|
||||
break
|
||||
cur_ver = "{:3}{:03d}".format("".join(letters), num)
|
||||
|
||||
runner = RequestRunner(ServerVoteSelector(), https=False)
|
||||
runner.max_tries = 20
|
||||
|
||||
done_count = 0
|
||||
total_count = len(allvers)
|
||||
for fv in allvers:
|
||||
done_count += 1
|
||||
print("Checking {} ({}/{})".format(fv, done_count, total_count))
|
||||
print(ansi.UP_DEL, end="")
|
||||
dev.fwver = fv
|
||||
chk = CheckRequest(dev)
|
||||
runner.run(chk)
|
||||
if chk.success:
|
||||
chkres = chk.get_result()
|
||||
txt_tv = chkres.tvver
|
||||
print("{}: {} ⇨ {} {}".format(dev.curef, fv, txt_tv, chkres.filehash))
|
||||
|
||||
print("Scan complete.")
|
||||
write_info_if_dumps_found()
|
||||
@@ -0,0 +1,71 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# pylint: disable=C0111,C0326,C0103
|
||||
|
||||
"""Find all OTA updates for a given PRD."""
|
||||
|
||||
import sys
|
||||
|
||||
from tcllib import ansi, argparser
|
||||
from tcllib.devices import MobileDevice
|
||||
from tcllib.dumpmgr import write_info_if_dumps_found
|
||||
from tcllib.requests import CheckRequest, RequestRunner, ServerVoteSelector
|
||||
|
||||
|
||||
dpdesc = """
|
||||
Finds all valid OTA updates for a given PRD. Scan range can be set by
|
||||
startver and endver switches.
|
||||
"""
|
||||
dp = argparser.DefaultParser(__file__, dpdesc)
|
||||
dp.add_argument("prd", help="CU Reference #, e.g. PRD-63117-011")
|
||||
dp.add_argument("startver", help="Beginning of scan range", nargs="?", default="AAA000")
|
||||
dp.add_argument("endver", help="End of scan range", nargs="?", default="AAZ999")
|
||||
args = dp.parse_args(sys.argv[1:])
|
||||
|
||||
dev = MobileDevice()
|
||||
dev.curef = args.prd
|
||||
dev.mode = 5
|
||||
start_ver = args.startver
|
||||
end_ver = args.endver
|
||||
|
||||
print("Valid firmwares for model {} (between {} and {}):".format(dev.curef, start_ver, end_ver))
|
||||
|
||||
cur_ver = start_ver
|
||||
allvers = []
|
||||
while True:
|
||||
allvers.append(cur_ver)
|
||||
if cur_ver == end_ver:
|
||||
break
|
||||
letters = list(cur_ver[:3])
|
||||
num = int(cur_ver[3:6])
|
||||
num += 1
|
||||
if num > 999:
|
||||
num = 0
|
||||
for i in range(2, -1, -1):
|
||||
if letters[i] == "Z":
|
||||
letters[i] = "A"
|
||||
continue
|
||||
letters[i] = chr(ord(letters[i])+1)
|
||||
break
|
||||
cur_ver = "{:3}{:03d}".format("".join(letters), num)
|
||||
|
||||
runner = RequestRunner(ServerVoteSelector(), https=False)
|
||||
runner.max_tries = 20
|
||||
|
||||
done_count = 0
|
||||
total_count = len(allvers)
|
||||
for fv in allvers:
|
||||
done_count += 1
|
||||
print("Checking {} ({}/{})".format(fv, done_count, total_count))
|
||||
print(ansi.UP_DEL, end="")
|
||||
dev.fwver = fv
|
||||
chk = CheckRequest(dev)
|
||||
runner.run(chk)
|
||||
if chk.success:
|
||||
chkres = chk.get_result()
|
||||
txt_tv = chkres.tvver
|
||||
print("{}: {} ⇨ {} {}".format(dev.curef, fv, txt_tv, chkres.filehash))
|
||||
|
||||
print("Scan complete.")
|
||||
write_info_if_dumps_found()
|
||||
@@ -0,0 +1,71 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# pylint: disable=C0111,C0326,C0103
|
||||
|
||||
"""Find all OTA updates for a given PRD."""
|
||||
|
||||
import sys
|
||||
|
||||
from tcllib import ansi, argparser
|
||||
from tcllib.devices import MobileDevice
|
||||
from tcllib.dumpmgr import write_info_if_dumps_found
|
||||
from tcllib.requests import CheckRequest, RequestRunner, ServerVoteSelector
|
||||
|
||||
|
||||
dpdesc = """
|
||||
Finds all valid OTA updates for a given PRD. Scan range can be set by
|
||||
startver and endver switches.
|
||||
"""
|
||||
dp = argparser.DefaultParser(__file__, dpdesc)
|
||||
dp.add_argument("prd", help="CU Reference #, e.g. PRD-63117-011")
|
||||
dp.add_argument("startver", help="Beginning of scan range", nargs="?", default="AAA000")
|
||||
dp.add_argument("endver", help="End of scan range", nargs="?", default="AAZ999")
|
||||
args = dp.parse_args(sys.argv[1:])
|
||||
|
||||
dev = MobileDevice()
|
||||
dev.curef = args.prd
|
||||
dev.mode = 6
|
||||
start_ver = args.startver
|
||||
end_ver = args.endver
|
||||
|
||||
print("Valid firmwares for model {} (between {} and {}):".format(dev.curef, start_ver, end_ver))
|
||||
|
||||
cur_ver = start_ver
|
||||
allvers = []
|
||||
while True:
|
||||
allvers.append(cur_ver)
|
||||
if cur_ver == end_ver:
|
||||
break
|
||||
letters = list(cur_ver[:3])
|
||||
num = int(cur_ver[3:6])
|
||||
num += 1
|
||||
if num > 999:
|
||||
num = 0
|
||||
for i in range(2, -1, -1):
|
||||
if letters[i] == "Z":
|
||||
letters[i] = "A"
|
||||
continue
|
||||
letters[i] = chr(ord(letters[i])+1)
|
||||
break
|
||||
cur_ver = "{:3}{:03d}".format("".join(letters), num)
|
||||
|
||||
runner = RequestRunner(ServerVoteSelector(), https=False)
|
||||
runner.max_tries = 20
|
||||
|
||||
done_count = 0
|
||||
total_count = len(allvers)
|
||||
for fv in allvers:
|
||||
done_count += 1
|
||||
print("Checking {} ({}/{})".format(fv, done_count, total_count))
|
||||
print(ansi.UP_DEL, end="")
|
||||
dev.fwver = fv
|
||||
chk = CheckRequest(dev)
|
||||
runner.run(chk)
|
||||
if chk.success:
|
||||
chkres = chk.get_result()
|
||||
txt_tv = chkres.tvver
|
||||
print("{}: {} ⇨ {} {}".format(dev.curef, fv, txt_tv, chkres.filehash))
|
||||
|
||||
print("Scan complete.")
|
||||
write_info_if_dumps_found()
|
||||
+128
@@ -0,0 +1,128 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# pylint: disable=C0111,C0326,C0103
|
||||
|
||||
"""Checks for the latest FULL or OTA updates for specified PRD number."""
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
from tcllib import argparser
|
||||
from tcllib.devices import Device
|
||||
from tcllib.dumpmgr import write_info_if_dumps_found
|
||||
from tcllib.requests import (CheckNewRequest, ChecksumRequest, DownloadRequest,
|
||||
EncryptHeaderRequest, RequestRunner,
|
||||
ServerSelector)
|
||||
|
||||
|
||||
dpdesc = """
|
||||
Checks for the latest FULL updates for the specified PRD number or for an OTA from the
|
||||
version specified as fvver.
|
||||
"""
|
||||
dp = argparser.DefaultParser(__file__, dpdesc)
|
||||
dp.add_argument("prd", nargs=1, help="CU Reference #, e.g. PRD-63117-011")
|
||||
dp.add_argument("fvver", nargs="?", help="Firmware version to check for OTA updates, e.g. AAM481 (omit to run FULL check)", default="AAA000")
|
||||
dp.add_argument("-i", "--imei", help="use specified IMEI instead of default", type=str)
|
||||
dp.add_argument("-m", "--mode", help="force type of update to check for", default="auto", type=str, choices=["full", "ota"])
|
||||
dp.add_argument("-t", "--type", help="force type of check to run", default="auto", type=str, choices=["desktop", "mobile"])
|
||||
dp.add_argument("--rawmode", help="override --mode with raw value (2=OTA, 4=FULL)", metavar="MODE")
|
||||
dp.add_argument("--rawcltp", help="override --type with raw value (10=MOBILE, 2010=DESKTOP)", metavar="CLTP")
|
||||
args = dp.parse_args(sys.argv[1:])
|
||||
|
||||
dev = Device(args.prd[0], args.fvver)
|
||||
dev.imei = "3531510"
|
||||
|
||||
|
||||
def sel_mode(txtmode, autoval, rawval):
|
||||
"""Handle custom mode."""
|
||||
if rawval:
|
||||
return rawval
|
||||
if txtmode == "auto":
|
||||
return autoval
|
||||
elif txtmode == "ota":
|
||||
return dev.MODE_STATES["OTA"]
|
||||
return dev.MODE_STATES["FULL"]
|
||||
|
||||
|
||||
def sel_cltp(txtmode, autoval, rawval):
|
||||
"""Handle custom CLTP."""
|
||||
if rawval:
|
||||
return rawval
|
||||
if txtmode == "auto":
|
||||
return autoval
|
||||
elif txtmode == "desktop":
|
||||
return dev.CLTP_STATES["DESKTOP"]
|
||||
return dev.CLTP_STATES["MOBILE"]
|
||||
|
||||
|
||||
if args.imei:
|
||||
print("Use specified IMEI: {}".format(args.imei))
|
||||
dev.imei = args.imei
|
||||
|
||||
if args.fvver == "AAA000":
|
||||
dev.mode = sel_mode(args.mode, dev.MODE_STATES["FULL"], args.rawmode)
|
||||
dev.cltp = sel_cltp(args.type, dev.CLTP_STATES["DESKTOP"], args.rawcltp)
|
||||
else:
|
||||
dev.mode = sel_mode(args.mode, dev.MODE_STATES["OTA"], args.rawmode)
|
||||
dev.cltp = sel_cltp(args.type, dev.CLTP_STATES["MOBILE"], args.rawcltp)
|
||||
|
||||
print("Mode: {}".format(dev.mode))
|
||||
print("CLTP: {}".format(dev.cltp))
|
||||
|
||||
runner = RequestRunner(ServerSelector())
|
||||
|
||||
# Check for update
|
||||
chk = CheckNewRequest(dev)
|
||||
runner.run(chk)
|
||||
if not chk.success:
|
||||
print("{}".format(chk.error))
|
||||
sys.exit(2)
|
||||
chkres = chk.get_result()
|
||||
print(chkres.pretty_xml())
|
||||
|
||||
# Request download
|
||||
dlr = DownloadRequest(dev, chkres.tvver, chkres.fw_id)
|
||||
runner.run(dlr)
|
||||
if not dlr.success:
|
||||
print("{}".format(dlr.error))
|
||||
sys.exit(3)
|
||||
dlrres = dlr.get_result()
|
||||
print(dlrres.pretty_xml())
|
||||
|
||||
if dlrres.encslaves:
|
||||
encrunner = RequestRunner(ServerSelector(dlrres.encslaves), https=False)
|
||||
cks = ChecksumRequest(dlrres.fileurl, dlrres.fileurl)
|
||||
encrunner.run(cks)
|
||||
if not cks.success:
|
||||
print("{}".format(cks.error))
|
||||
sys.exit(4)
|
||||
cksres = cks.get_result()
|
||||
print(cksres.pretty_xml())
|
||||
|
||||
for s in dlrres.slaves:
|
||||
print("http://{}{}".format(s, dlrres.fileurl))
|
||||
|
||||
for s in dlrres.s3_slaves:
|
||||
print("http://{}{}".format(s, dlrres.s3_fileurl))
|
||||
|
||||
if dev.mode == dev.MODE_STATES["FULL"]:
|
||||
hdr = EncryptHeaderRequest(dlrres.fileurl)
|
||||
encrunner.run(hdr)
|
||||
if not hdr.success:
|
||||
print("{}".format(hdr.error))
|
||||
sys.exit(5)
|
||||
hdrres = hdr.get_result()
|
||||
headname = "header_{}.bin".format(chkres.tvver)
|
||||
headdir = "headers"
|
||||
if not os.path.exists(headdir):
|
||||
os.makedirs(headdir)
|
||||
if len(hdrres.rawdata) == 4194320:
|
||||
# TODO: Check sha1sum
|
||||
print("Header length check passed. Writing to {}.".format(headname))
|
||||
with open(os.path.join(headdir, headname), "wb") as f:
|
||||
f.write(hdrres.rawdata)
|
||||
else:
|
||||
print("Header length invalid ({}).".format(len(hdrres.rawdata)))
|
||||
|
||||
write_info_if_dumps_found()
|
||||
Regular → Executable
+1
-1
@@ -8,7 +8,7 @@
|
||||
import sys
|
||||
|
||||
from tcllib import argparser
|
||||
from tcllib.requests import RequestRunner, ChecksumRequest, ServerSelector
|
||||
from tcllib.requests import ChecksumRequest, RequestRunner, ServerSelector
|
||||
|
||||
encslaves = [
|
||||
"54.238.56.196",
|
||||
|
||||
Regular → Executable
+4
-3
@@ -10,9 +10,9 @@ import sys
|
||||
|
||||
from tcllib import argparser
|
||||
from tcllib.devices import DesktopDevice
|
||||
from tcllib.requests import RequestRunner, CheckRequest, DownloadRequest, \
|
||||
ChecksumRequest, EncryptHeaderRequest, ServerSelector, \
|
||||
write_info_if_dumps_found
|
||||
from tcllib.dumpmgr import write_info_if_dumps_found
|
||||
from tcllib.requests import (DownloadRequest, EncryptHeaderRequest,
|
||||
RequestRunner, ServerSelector)
|
||||
|
||||
|
||||
dpdesc = """
|
||||
@@ -31,6 +31,7 @@ args = dp.parse_args(sys.argv[1:])
|
||||
|
||||
dev = DesktopDevice()
|
||||
|
||||
|
||||
def sel_mode(defaultmode, rawval):
|
||||
"""Handle custom mode."""
|
||||
if rawval:
|
||||
|
||||
@@ -10,19 +10,20 @@ import sys
|
||||
|
||||
from tcllib import ansi, argparser, devlist
|
||||
from tcllib.devices import DesktopDevice
|
||||
from tcllib.requests import RequestRunner, CheckRequest, ServerVoteSelector, \
|
||||
write_info_if_dumps_found
|
||||
|
||||
from tcllib.dumpmgr import write_info_if_dumps_found
|
||||
from tcllib.requests import CheckRequest, RequestRunner, ServerVoteSelector
|
||||
|
||||
dpdesc = """
|
||||
Finds new PRD numbers for all known variants, or specified variants with tocheck. Scan range
|
||||
can be set by floor and ceiling switches.
|
||||
"""
|
||||
dp = argparser.DefaultParser(__file__, dpdesc)
|
||||
dp.add_argument("tocheck", help="CU Reference # to filter scan results", nargs="?", default=None)
|
||||
dp.add_argument("tocheck", help="CU Reference #(s) to filter scan results", nargs="*", default=None)
|
||||
dp.add_argument("-f", "--floor", help="Beginning of scan range", dest="floor", nargs="?", type=int, default=0)
|
||||
dp.add_argument("-c", "--ceiling", help="End of scan range", dest="ceiling", nargs="?", type=int, default=999)
|
||||
dp.add_argument("-l", "--local", help="Force using local database", dest="local", action="store_true", default=False)
|
||||
dp.add_argument("-np", "--no-prefix", help="Skip 'PRD-' prefix", dest="noprefix", action="store_true", default=False)
|
||||
dp.add_argument("-k2", "--key2", help="V2 syntax", dest="key2mode", action="store_true", default=False)
|
||||
args = dp.parse_args(sys.argv[1:])
|
||||
|
||||
floor = args.floor
|
||||
@@ -37,27 +38,37 @@ print(" OK")
|
||||
|
||||
print("Valid PRDs not already in database:")
|
||||
|
||||
prds = [x.replace("PRD-", "").split("-") for x in prd_db]
|
||||
prdx = list({x[0]: x[1]} for x in prds)
|
||||
prds = [x.replace("APBI-PRD", "").replace("PRD-", "").replace("-", "") for x in prd_db]
|
||||
prdx = list({x[0:5]: x[5:]} for x in prds)
|
||||
prddict = collections.defaultdict(list)
|
||||
for prdc in prdx:
|
||||
for key, value in prdc.items():
|
||||
prddict[key].append(value)
|
||||
|
||||
if args.tocheck is not None:
|
||||
args.tocheck = args.tocheck.replace("PRD-", "")
|
||||
if not isinstance(args.tocheck, list):
|
||||
args.tocheck = [args.tocheck]
|
||||
args.tocheck = [toch.replace("APBI-PRD", "").replace("PRD-", "") for toch in args.tocheck]
|
||||
prdkeys = list(prddict.keys())
|
||||
for k in prdkeys:
|
||||
if k != args.tocheck:
|
||||
if k not in args.tocheck:
|
||||
del prddict[k]
|
||||
if not prddict:
|
||||
prddict[args.tocheck] = []
|
||||
for toch in args.tocheck:
|
||||
if toch not in prddict.keys():
|
||||
prddict[toch] = []
|
||||
|
||||
dev = DesktopDevice()
|
||||
|
||||
runner = RequestRunner(ServerVoteSelector(), https=False)
|
||||
runner.max_tries = 20
|
||||
|
||||
if args.key2mode:
|
||||
prefix = "APBI-PRD"
|
||||
suffix = ""
|
||||
else:
|
||||
prefix = "" if args.noprefix else "PRD-"
|
||||
suffix = "-"
|
||||
|
||||
for center in sorted(prddict.keys()):
|
||||
tails = [int(i) for i in prddict[center]]
|
||||
safes = [g for g in range(floor, ceiling) if g not in tails]
|
||||
@@ -65,7 +76,7 @@ for center in sorted(prddict.keys()):
|
||||
done_count = 0
|
||||
print("Checking {} variant codes for model {}.".format(total_count, center))
|
||||
for j in safes:
|
||||
curef = "PRD-{}-{:03}".format(center, j)
|
||||
curef = "{}{}{}{:03}".format(prefix, center, suffix, j)
|
||||
done_count += 1
|
||||
print("Checking {} ({}/{})".format(curef, done_count, total_count))
|
||||
print(ansi.UP_DEL, end="")
|
||||
Regular → Executable
+8
-4
@@ -9,8 +9,8 @@ import sys
|
||||
|
||||
from tcllib import ansi, argparser, devlist
|
||||
from tcllib.devices import DesktopDevice
|
||||
from tcllib.requests import RequestRunner, CheckRequest, ServerVoteSelector, \
|
||||
write_info_if_dumps_found
|
||||
from tcllib.dumpmgr import write_info_if_dumps_found
|
||||
from tcllib.requests import CheckRequest, RequestRunner, ServerVoteSelector
|
||||
|
||||
|
||||
# Variants to scan for
|
||||
@@ -24,6 +24,7 @@ dp = argparser.DefaultParser(__file__, dpdesc)
|
||||
dp.add_argument("floor", nargs="?", help="Model number to start with", type=int, default=63116)
|
||||
dp.add_argument("ceiling", nargs="?", help="Model number to end with", type=int, default=99999)
|
||||
dp.add_argument("-l", "--local", help="Force using local database", dest="local", action="store_true", default=False)
|
||||
dp.add_argument("-k2", "--key2", help="V2 syntax", dest="key2mode", action="store_true", default=False)
|
||||
args = dp.parse_args(sys.argv[1:])
|
||||
|
||||
floor = args.floor
|
||||
@@ -38,7 +39,7 @@ print(" OK")
|
||||
|
||||
print("Valid PRDs not already in database:")
|
||||
|
||||
known_centers = set(int(x.replace("PRD-", "").split("-")[0]) for x in prd_db)
|
||||
known_centers = set(int(x.replace("PRD-", "").replace("APBI-PRD", "")[0:5]) for x in prd_db)
|
||||
scan_list = set(range(floor, ceiling))
|
||||
|
||||
to_scan = scan_list - known_centers
|
||||
@@ -52,7 +53,10 @@ runner.max_tries = 20
|
||||
|
||||
for center in to_scan:
|
||||
for j in SCAN_VARIANTS:
|
||||
curef = "PRD-{:05}-{:3}".format(center, j)
|
||||
if args.key2mode:
|
||||
curef = "APBI-PRD{:05}{:3}".format(center, j)
|
||||
else:
|
||||
curef = "PRD-{:05}-{:3}".format(center, j)
|
||||
done_count += 1
|
||||
print("Checking {} ({}/{})".format(curef, done_count, total_count))
|
||||
print(ansi.UP_DEL, end="")
|
||||
@@ -7,10 +7,10 @@
|
||||
|
||||
import sys
|
||||
|
||||
from tcllib import ansi, argparser, devlist
|
||||
from tcllib import ansi, argparser
|
||||
from tcllib.devices import MobileDevice
|
||||
from tcllib.requests import RequestRunner, CheckRequest, ServerVoteSelector, \
|
||||
write_info_if_dumps_found
|
||||
from tcllib.dumpmgr import write_info_if_dumps_found
|
||||
from tcllib.requests import CheckRequest, RequestRunner, ServerVoteSelector
|
||||
|
||||
|
||||
dpdesc = """
|
||||
Regular → Executable
+22
-5
@@ -6,12 +6,22 @@
|
||||
"""Query existence of missing OTAs."""
|
||||
|
||||
import json
|
||||
import sys
|
||||
|
||||
import requests
|
||||
|
||||
from tcllib import ansi
|
||||
from tcllib import argparser
|
||||
from tcllib.devices import MobileDevice
|
||||
from tcllib.requests import RequestRunner, CheckRequest, ServerVoteSelector, \
|
||||
write_info_if_dumps_found
|
||||
from tcllib.dumpmgr import write_info_if_dumps_found
|
||||
from tcllib.requests import CheckRequest, RequestRunner, ServerVoteSelector
|
||||
|
||||
|
||||
dpdesc = """
|
||||
Queries the database server for known versions and tries to find OTA files not yet in the database.
|
||||
"""
|
||||
dp = argparser.DefaultParser(__file__, dpdesc)
|
||||
args = dp.parse_args(sys.argv[1:])
|
||||
del args
|
||||
|
||||
|
||||
# 1. Fetch list of missing OTAs (e.g. from ancient versions to current)
|
||||
@@ -43,8 +53,15 @@ for prd, data in versions.items():
|
||||
chk = CheckRequest(dev)
|
||||
runner.run(chk)
|
||||
if chk.success:
|
||||
print("✔", end="", flush=True)
|
||||
num_item += 1
|
||||
if chk.result.tvver == data["latest_ota"]:
|
||||
print("✔", end="", flush=True)
|
||||
num_item += 1
|
||||
elif chk.result.tvver in data["update_map"] and ver in data["update_map"][chk.result.tvver]:
|
||||
# Delete dump as we already know the information
|
||||
chk.result.delete_dump()
|
||||
print("%", end="", flush=True)
|
||||
else:
|
||||
print("~", end="", flush=True)
|
||||
else:
|
||||
print("✖", end="", flush=True)
|
||||
print("")
|
||||
+5
-30
@@ -1,35 +1,10 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# pylint: disable=C0111,C0326,C0103
|
||||
|
||||
"""Library for TCL API work and related functions."""
|
||||
|
||||
import requests
|
||||
|
||||
from . import (dumpmgr, servervote, tclcheck, tclchecksum, tclencheader,
|
||||
tclrequest)
|
||||
|
||||
|
||||
class FotaCheck(
|
||||
tclcheck.TclCheckMixin,
|
||||
tclrequest.TclRequestMixin,
|
||||
tclchecksum.TclChecksumMixin,
|
||||
tclencheader.TclEncHeaderMixin,
|
||||
servervote.ServerVoteMixin,
|
||||
dumpmgr.DumpMgrMixin
|
||||
):
|
||||
"""Main API handler class."""
|
||||
|
||||
def __init__(self):
|
||||
"""Handle mixins and populate variables."""
|
||||
super().__init__()
|
||||
self.reset_session()
|
||||
|
||||
def reset_session(self, device=None):
|
||||
"""Reset everything to default."""
|
||||
self.g2master = self.get_master_server()
|
||||
self.sess = requests.Session()
|
||||
if device:
|
||||
self.sess.headers.update({"User-Agent": device.ua})
|
||||
return self.sess
|
||||
from .ansi import *
|
||||
from .argparser import *
|
||||
from .devices import *
|
||||
from .devlist import *
|
||||
from .dumpmgr import *
|
||||
|
||||
+3
-2
@@ -1,3 +1,4 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""Pseudo-devices for desktop/mobile requests"""
|
||||
@@ -27,7 +28,7 @@ class Device():
|
||||
self.chnl = self.CHNL_STATES["WIFI"]
|
||||
self.cktp = self.CKTP_STATES["MANUAL"]
|
||||
self.ckot = self.CKOT_STATES["ALL"]
|
||||
self.ua = "tcl"
|
||||
self.uagent = "tcl"
|
||||
|
||||
def is_rooted(self):
|
||||
"""Get RTD as boolean."""
|
||||
@@ -71,7 +72,7 @@ class MobileDevice(Device):
|
||||
self.imei = "3531510"
|
||||
self.set_cltp("MOBILE")
|
||||
self.set_mode("OTA")
|
||||
self.ua = "com.tcl.fota/5.1.0.2.0029.0, Android"
|
||||
self.uagent = "com.tcl.fota/5.1.0.2.0029.0, Android"
|
||||
|
||||
|
||||
class DesktopDevice(Device):
|
||||
|
||||
+38
-20
@@ -19,7 +19,7 @@ DEVICELIST_FILE = "prds.json"
|
||||
DEVICELIST_CACHE_SECONDS = 86400
|
||||
|
||||
|
||||
def load_local_devicelist():
|
||||
def _load_local_devicelist():
|
||||
"""Load local devicelist and return decoded JSON (or None) and need_download status."""
|
||||
need_download = True
|
||||
try:
|
||||
@@ -29,22 +29,18 @@ def load_local_devicelist():
|
||||
need_download = False
|
||||
with open(DEVICELIST_FILE, "rt") as dlfile:
|
||||
return json.load(dlfile), need_download
|
||||
except FileNotFoundError:
|
||||
except (FileNotFoundError, json.decoder.JSONDecodeError):
|
||||
return None, True
|
||||
|
||||
|
||||
def get_devicelist(force=False, output_diff=True, local=False):
|
||||
"""Return device list from saved database."""
|
||||
old_prds, need_download = load_local_devicelist()
|
||||
|
||||
if local:
|
||||
return old_prds
|
||||
|
||||
if need_download or force:
|
||||
def _download_devicelist(doit: bool):
|
||||
"""Download device list if doit is set. Or do nothing."""
|
||||
if doit:
|
||||
prds_json = requests.get(DEVICELIST_URL).text
|
||||
with open(DEVICELIST_FILE, "wt") as dlfile:
|
||||
dlfile.write(prds_json)
|
||||
|
||||
def _load_devicelist_with_diff(output_diff: bool, old_prds: dict = {}) -> dict:
|
||||
"""Load local devicelist and output diff if requested."""
|
||||
with open(DEVICELIST_FILE, "rt") as dlfile:
|
||||
prds = json.load(dlfile)
|
||||
|
||||
@@ -53,8 +49,19 @@ def get_devicelist(force=False, output_diff=True, local=False):
|
||||
|
||||
return prds
|
||||
|
||||
def get_devicelist(force: bool=False, output_diff: bool=True, local: bool=False) -> dict:
|
||||
"""Return device list from saved database."""
|
||||
old_prds, need_download = _load_local_devicelist()
|
||||
|
||||
def print_versions_diff(old_data, new_data):
|
||||
if local:
|
||||
return old_prds
|
||||
|
||||
_download_devicelist(need_download or force)
|
||||
|
||||
return _load_devicelist_with_diff(output_diff, old_prds)
|
||||
|
||||
|
||||
def print_versions_diff(old_data: dict, new_data: dict):
|
||||
"""Print version changes between old and new databases."""
|
||||
prd = new_data["curef"]
|
||||
if new_data["last_full"] != old_data["last_full"] and new_data["last_ota"] != old_data["last_ota"]:
|
||||
@@ -70,17 +77,28 @@ def print_versions_diff(old_data, new_data):
|
||||
elif new_data["last_ota"] != old_data["last_ota"]:
|
||||
print("> {}: {} ⇨ {} (OTA)".format(prd, ansi.YELLOW_DARK + str(old_data["last_ota"]) + ansi.RESET, ansi.YELLOW + str(new_data["last_ota"]) + ansi.RESET))
|
||||
|
||||
|
||||
def print_prd_diff(old_prds, new_prds):
|
||||
"""Print PRD changes between old and new databases."""
|
||||
added_prds = [prd for prd in new_prds if prd not in old_prds]
|
||||
removed_prds = [prd for prd in old_prds if prd not in new_prds]
|
||||
def _print_removed_prds(prds_data: dict, removed_prds: list):
|
||||
"""Print details of selected PRDs as removed."""
|
||||
for prd in removed_prds:
|
||||
print("> Removed device {} (was at {} / OTA: {}).".format(ansi.RED + prd + ansi.RESET, old_prds[prd]["last_full"], old_prds[prd]["last_ota"]))
|
||||
print("> Removed device {} (was at {} / OTA: {}).".format(ansi.RED + prd + ansi.RESET, prds_data[prd]["last_full"], prds_data[prd]["last_ota"]))
|
||||
|
||||
def _print_added_prds(prds_data: dict, added_prds: list):
|
||||
"""Print details of selected PRDs as added."""
|
||||
for prd in added_prds:
|
||||
print("> New device {} ({} / OTA: {}).".format(ansi.GREEN + prd + ansi.RESET, new_prds[prd]["last_full"], new_prds[prd]["last_ota"]))
|
||||
print("> New device {} ({} / OTA: {}).".format(ansi.GREEN + prd + ansi.RESET, prds_data[prd]["last_full"], prds_data[prd]["last_ota"]))
|
||||
|
||||
def _print_changed_prds(old_prds: dict, new_prds: dict, skip_prds: list):
|
||||
"""Print details of changed PRDs."""
|
||||
for prd, pdata in new_prds.items():
|
||||
if prd in added_prds:
|
||||
if prd in skip_prds:
|
||||
continue
|
||||
odata = old_prds[prd]
|
||||
print_versions_diff(odata, pdata)
|
||||
|
||||
def print_prd_diff(old_prds: dict, new_prds: dict):
|
||||
"""Print PRD changes between old and new databases."""
|
||||
added_prds = [prd for prd in new_prds if prd not in old_prds]
|
||||
removed_prds = [prd for prd in old_prds if prd not in new_prds]
|
||||
_print_removed_prds(old_prds, removed_prds)
|
||||
_print_added_prds(new_prds, added_prds)
|
||||
_print_changed_prds(old_prds, new_prds, added_prds)
|
||||
|
||||
+21
-21
@@ -15,23 +15,34 @@ from math import floor
|
||||
from . import ansi
|
||||
|
||||
|
||||
class DumpMgrMixin:
|
||||
"""A mixin component for XML dump management."""
|
||||
def get_timestamp_random():
|
||||
"""Generate timestamp + random part to avoid collisions."""
|
||||
millis = floor(time.time() * 1000)
|
||||
tail = "{:06d}".format(random.randint(0, 999999))
|
||||
return "{}_{}".format(str(millis), tail)
|
||||
|
||||
|
||||
def write_info_if_dumps_found():
|
||||
"""Notify user to upload dumps if present."""
|
||||
# To disable this info, uncomment the following line.
|
||||
# return
|
||||
files = glob.glob(os.path.normpath("logs/*.xml"))
|
||||
if files:
|
||||
print()
|
||||
print("{}There are {} logs collected in the logs/ directory.{} Please consider uploading".format(ansi.YELLOW, len(files), ansi.RESET))
|
||||
print("them to https://tclota.birth-online.de/ by running {}./upload_logs.py{}.".format(ansi.CYAN, ansi.RESET))
|
||||
|
||||
|
||||
class DumpMgr:
|
||||
"""A class for XML dump management."""
|
||||
|
||||
def __init__(self):
|
||||
"""Populate dump file name."""
|
||||
self.last_dump_filename = None
|
||||
|
||||
@staticmethod
|
||||
def get_timestamp_random():
|
||||
"""Generate timestamp + random part to avoid collisions."""
|
||||
millis = floor(time.time() * 1000)
|
||||
tail = "{:06d}".format(random.randint(0, 999999))
|
||||
return "{}_{}".format(str(millis), tail)
|
||||
|
||||
def write_dump(self, data):
|
||||
"""Write dump to file."""
|
||||
outfile = os.path.normpath("logs/{}.xml".format(self.get_timestamp_random()))
|
||||
outfile = os.path.normpath("logs/{}.xml".format(get_timestamp_random()))
|
||||
if not os.path.exists(os.path.dirname(outfile)):
|
||||
try:
|
||||
os.makedirs(os.path.dirname(outfile))
|
||||
@@ -47,14 +58,3 @@ class DumpMgrMixin:
|
||||
if self.last_dump_filename:
|
||||
os.unlink(self.last_dump_filename)
|
||||
self.last_dump_filename = None
|
||||
|
||||
@staticmethod
|
||||
def write_info_if_dumps_found():
|
||||
"""Notify user to upload dumps if present."""
|
||||
# To disable this info, uncomment the following line.
|
||||
# return
|
||||
files = glob.glob(os.path.normpath("logs/*.xml"))
|
||||
if files:
|
||||
print()
|
||||
print("{}There are {} logs collected in the logs/ directory.{} Please consider uploading".format(ansi.YELLOW, len(files), ansi.RESET))
|
||||
print("them to https://tclota.birth-online.de/ by running {}./upload_logs.py{}.".format(ansi.CYAN, ansi.RESET))
|
||||
|
||||
@@ -1,9 +1,12 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""Library for generic TCL API requests."""
|
||||
|
||||
from .checkrequest import CheckRequest
|
||||
from .downloadrequest import DownloadRequest
|
||||
from .checknewrequest import CheckNewRequest
|
||||
from .checksumrequest import ChecksumRequest
|
||||
from .downloadrequest import DownloadRequest
|
||||
from .encryptheaderrequest import EncryptHeaderRequest
|
||||
from .runner import *
|
||||
from .serverselector import *
|
||||
from .dumpmgr import write_info_if_dumps_found
|
||||
|
||||
@@ -0,0 +1,104 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""Generic update check request."""
|
||||
|
||||
import binascii
|
||||
import hashlib
|
||||
import random
|
||||
import time
|
||||
import zlib
|
||||
from collections import OrderedDict
|
||||
from math import floor
|
||||
|
||||
from .. import devices
|
||||
from .tclrequest import TclRequest
|
||||
from .tclresult import CheckResult
|
||||
|
||||
VDKEY = "010010000110111101110111001000000110000101110010011001010010000001111001011011110111010100100000011001110110010101110100001000000111010001101000011010010111001100100000011010110110010101111001001000000111011101101111011100100110010000111111"
|
||||
VDKEY_B64Z = b"eJwdjwEOwDAIAr8kKFr//7HhmqXp8AIIDrYAgg8byiUXrwRJRXja+d6iNxu0AhUooDCN9rd6rDLxmGIakUVWo3IGCTRWqCAt6X4jGEIUAxgN0eYWnp+LkpHQAg/PsO90ELsy0Npm/n2HbtPndFgGEV31R9OmT4O4nrddjc3Qt6nWscx7e+WRHq5UnOudtjw5skuV09pFhvmqnOEIs4ljPeel1wfLYUF4\n"
|
||||
|
||||
def get_salt():
|
||||
"""Generate cryptographic salt."""
|
||||
millis = floor(time.time() * 1000)
|
||||
tail = "{:06d}".format(random.randint(0, 999999))
|
||||
return "{}{}".format(millis, tail)
|
||||
|
||||
|
||||
def get_vk2(params_dict, cltp):
|
||||
"""Generate salted hash of API parameters."""
|
||||
#params_dict["cltp"] = cltp
|
||||
query = ""
|
||||
for key, val in params_dict.items():
|
||||
if query:
|
||||
query += "&"
|
||||
query += key + "=" + str(val)
|
||||
#vdk = zlib.decompress(binascii.a2b_base64(VDKEY_B64Z))
|
||||
#query += vdk.decode("utf-8")
|
||||
query += VDKEY
|
||||
engine = hashlib.sha1()
|
||||
engine.update(bytes(query, "utf-8"))
|
||||
hexhash = engine.hexdigest()
|
||||
return hexhash
|
||||
|
||||
class CheckNewRequest(TclRequest):
|
||||
"""Generic update check request."""
|
||||
|
||||
def __init__(self, device: devices.Device):
|
||||
"""Populate variables.."""
|
||||
super().__init__()
|
||||
self.uri = "/check_new.php"
|
||||
self.method = "GET"
|
||||
self.device = device
|
||||
|
||||
def get_headers(self):
|
||||
"""Return request headers."""
|
||||
return {"User-Agent": "GOTU Client v10.1.1"}
|
||||
|
||||
def get_params(self):
|
||||
"""Return request parameters."""
|
||||
params = OrderedDict()
|
||||
params["id"] = self.device.imei
|
||||
params["salt"] = get_salt()
|
||||
params["curef"] = self.device.curef
|
||||
params["fv"] = self.device.fwver
|
||||
params["type"] = self.device.type
|
||||
params["mode"] = self.device.mode
|
||||
params["cltp"] = self.device.cltp
|
||||
params["vk"] = get_vk2(params, self.device.cltp)
|
||||
#params["cktp"] = self.device.cktp
|
||||
#params["rtd"] = self.device.rtd
|
||||
#params["chnl"] = self.device.chnl
|
||||
#params["osvs"] = self.device.osvs
|
||||
#params["ckot"] = self.device.ckot
|
||||
print(repr(params))
|
||||
return params
|
||||
|
||||
def is_done(self, http_status: int, contents: str) -> bool:
|
||||
"""Handle request result."""
|
||||
ok_states = {
|
||||
204: "No update available.",
|
||||
404: "No data for requested CUREF/FV combination.",
|
||||
}
|
||||
if http_status == 200:
|
||||
self.response = contents
|
||||
self.result = CheckResult(contents)
|
||||
self.success = True
|
||||
return True
|
||||
elif http_status in ok_states:
|
||||
self.error = ok_states[http_status]
|
||||
self.success = False
|
||||
return True
|
||||
elif http_status not in [500, 502, 503]:
|
||||
# Errors OTHER than 500, 502 or 503 are probably
|
||||
# errors where we don't need to retry
|
||||
self.error = "HTTP {}.".format(http_status)
|
||||
self.success = False
|
||||
return True
|
||||
return False
|
||||
|
||||
# Check requests have 4 possible outcomes:
|
||||
# 1. HTTP 200 with XML data - our desired info
|
||||
# 2. HTTP 204 - means: no newer update available
|
||||
# 3. HTTP 404 - means: invalid device or firmware version
|
||||
# 4. anything else: server problem (esp. 500, 502, 503)
|
||||
@@ -1,21 +1,31 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""Generic update check request."""
|
||||
|
||||
from collections import OrderedDict
|
||||
|
||||
from .. import devices
|
||||
from .tclrequest import TclRequest
|
||||
from .tclresult import CheckResult
|
||||
|
||||
|
||||
class CheckRequest(TclRequest):
|
||||
"""Generic update check request."""
|
||||
|
||||
def __init__(self, device: devices.Device):
|
||||
"""Populate variables.."""
|
||||
super().__init__()
|
||||
self.uri = "/check.php"
|
||||
self.method = "GET"
|
||||
self.device = device
|
||||
|
||||
|
||||
def get_headers(self):
|
||||
return {"User-Agent": self.device.ua}
|
||||
"""Return request headers."""
|
||||
return {"User-Agent": self.device.uagent}
|
||||
|
||||
def get_params(self):
|
||||
"""Return request parameters."""
|
||||
params = OrderedDict()
|
||||
params["id"] = self.device.imei
|
||||
params["curef"] = self.device.curef
|
||||
@@ -31,6 +41,7 @@ class CheckRequest(TclRequest):
|
||||
return params
|
||||
|
||||
def is_done(self, http_status: int, contents: str) -> bool:
|
||||
"""Handle request result."""
|
||||
ok_states = {
|
||||
204: "No update available.",
|
||||
404: "No data for requested CUREF/FV combination.",
|
||||
@@ -47,7 +58,7 @@ class CheckRequest(TclRequest):
|
||||
elif http_status not in [500, 502, 503]:
|
||||
# Errors OTHER than 500, 502 or 503 are probably
|
||||
# errors where we don't need to retry
|
||||
self.error ="HTTP {}.".format(http_status)
|
||||
self.error = "HTTP {}.".format(http_status)
|
||||
self.success = False
|
||||
return True
|
||||
return False
|
||||
|
||||
@@ -1,13 +1,21 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from collections import OrderedDict
|
||||
"""Generic file checksum request."""
|
||||
|
||||
import json
|
||||
from .. import credentials, devices
|
||||
from collections import OrderedDict
|
||||
|
||||
from .. import credentials
|
||||
from .tclrequest import TclRequest
|
||||
from .tclresult import ChecksumResult
|
||||
|
||||
|
||||
class ChecksumRequest(TclRequest):
|
||||
"""Generic file checksum request."""
|
||||
|
||||
def __init__(self, address, file_uri):
|
||||
"""Populate variables."""
|
||||
super().__init__()
|
||||
# NOTE: THIS HAS TO BE RUN ON AN ENCSLAVE
|
||||
self.uri = "/checksum.php"
|
||||
@@ -16,9 +24,11 @@ class ChecksumRequest(TclRequest):
|
||||
self.file_uri = file_uri
|
||||
|
||||
def get_headers(self):
|
||||
"""Return request headers."""
|
||||
return {"User-Agent": "tcl"}
|
||||
|
||||
def get_params(self):
|
||||
"""Return request parameters."""
|
||||
params = OrderedDict()
|
||||
params.update(credentials.get_creds2())
|
||||
payload = {self.address: self.file_uri}
|
||||
@@ -27,6 +37,7 @@ class ChecksumRequest(TclRequest):
|
||||
return params
|
||||
|
||||
def is_done(self, http_status: int, contents: str) -> bool:
|
||||
"""Handle request result."""
|
||||
if http_status == 200:
|
||||
# <ENCRYPT_FOOTER>2abfa6f6507044fec995efede5d818e62a0b19b5</ENCRYPT_FOOTER> means ERROR (invalid ADDRESS!)
|
||||
if "<ENCRYPT_FOOTER>2abfa6f6507044fec995efede5d818e62a0b19b5</ENCRYPT_FOOTER>" in contents:
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""Generic file download request."""
|
||||
|
||||
import binascii
|
||||
import hashlib
|
||||
import random
|
||||
@@ -7,6 +10,7 @@ import time
|
||||
import zlib
|
||||
from collections import OrderedDict
|
||||
from math import floor
|
||||
|
||||
from .. import devices
|
||||
from .tclrequest import TclRequest
|
||||
from .tclresult import DownloadResult
|
||||
@@ -21,6 +25,7 @@ def get_salt():
|
||||
tail = "{:06d}".format(random.randint(0, 999999))
|
||||
return "{}{}".format(str(millis), tail)
|
||||
|
||||
|
||||
def get_vk2(params_dict, cltp):
|
||||
"""Generate salted hash of API parameters."""
|
||||
params_dict["cltp"] = cltp
|
||||
@@ -36,8 +41,12 @@ def get_vk2(params_dict, cltp):
|
||||
hexhash = engine.hexdigest()
|
||||
return hexhash
|
||||
|
||||
|
||||
class DownloadRequest(TclRequest):
|
||||
"""Generic file download request."""
|
||||
|
||||
def __init__(self, device: devices.Device, tvver: str, fw_id: str):
|
||||
"""Populate variables."""
|
||||
super().__init__()
|
||||
self.uri = "/download_request.php"
|
||||
self.method = "POST"
|
||||
@@ -46,9 +55,11 @@ class DownloadRequest(TclRequest):
|
||||
self.fw_id = fw_id
|
||||
|
||||
def get_headers(self):
|
||||
return {"User-Agent": self.device.ua}
|
||||
"""Return request headers."""
|
||||
return {"User-Agent": self.device.uagent}
|
||||
|
||||
def get_params(self):
|
||||
"""Return request parameters."""
|
||||
params = OrderedDict()
|
||||
params["id"] = self.device.imei
|
||||
params["salt"] = get_salt()
|
||||
@@ -68,6 +79,7 @@ class DownloadRequest(TclRequest):
|
||||
return params
|
||||
|
||||
def is_done(self, http_status: int, contents: str) -> bool:
|
||||
"""Handle request result."""
|
||||
if http_status == 200:
|
||||
self.response = contents
|
||||
self.result = DownloadResult(contents)
|
||||
|
||||
@@ -1,58 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# pylint: disable=C0111,C0326,C0103
|
||||
|
||||
"""Tools to manage dumps of API requests."""
|
||||
|
||||
import errno
|
||||
import glob
|
||||
import os
|
||||
import random
|
||||
import time
|
||||
from math import floor
|
||||
|
||||
from .. import ansi
|
||||
|
||||
|
||||
def get_timestamp_random():
|
||||
"""Generate timestamp + random part to avoid collisions."""
|
||||
millis = floor(time.time() * 1000)
|
||||
tail = "{:06d}".format(random.randint(0, 999999))
|
||||
return "{}_{}".format(str(millis), tail)
|
||||
|
||||
def write_info_if_dumps_found():
|
||||
"""Notify user to upload dumps if present."""
|
||||
# To disable this info, uncomment the following line.
|
||||
# return
|
||||
files = glob.glob(os.path.normpath("logs/*.xml"))
|
||||
if files:
|
||||
print()
|
||||
print("{}There are {} logs collected in the logs/ directory.{} Please consider uploading".format(ansi.YELLOW, len(files), ansi.RESET))
|
||||
print("them to https://tclota.birth-online.de/ by running {}./upload_logs.py{}.".format(ansi.CYAN, ansi.RESET))
|
||||
|
||||
class DumpMgr:
|
||||
"""A class for XML dump management."""
|
||||
|
||||
def __init__(self):
|
||||
"""Populate dump file name."""
|
||||
self.last_dump_filename = None
|
||||
|
||||
def write_dump(self, data):
|
||||
"""Write dump to file."""
|
||||
outfile = os.path.normpath("logs/{}.xml".format(get_timestamp_random()))
|
||||
if not os.path.exists(os.path.dirname(outfile)):
|
||||
try:
|
||||
os.makedirs(os.path.dirname(outfile))
|
||||
except OSError as err:
|
||||
if err.errno != errno.EEXIST:
|
||||
raise
|
||||
with open(outfile, "w", encoding="utf-8") as fhandle:
|
||||
fhandle.write(data)
|
||||
self.last_dump_filename = outfile
|
||||
|
||||
def delete_last_dump(self):
|
||||
"""Delete last dump."""
|
||||
if self.last_dump_filename:
|
||||
os.unlink(self.last_dump_filename)
|
||||
self.last_dump_filename = None
|
||||
@@ -1,12 +1,20 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""Generic encrypted header download request."""
|
||||
|
||||
from collections import OrderedDict
|
||||
from .. import credentials, devices
|
||||
|
||||
from .. import credentials
|
||||
from .tclrequest import TclRequest
|
||||
from .tclresult import EncryptHeaderResult
|
||||
|
||||
|
||||
class EncryptHeaderRequest(TclRequest):
|
||||
"""Generic encrypted header download request."""
|
||||
|
||||
def __init__(self, file_uri):
|
||||
"""Populate variables."""
|
||||
super().__init__()
|
||||
# NOTE: THIS HAS TO BE RUN ON AN ENCSLAVE
|
||||
self.uri = "/encrypt_header.php"
|
||||
@@ -15,15 +23,18 @@ class EncryptHeaderRequest(TclRequest):
|
||||
self.file_uri = file_uri
|
||||
|
||||
def get_headers(self):
|
||||
"""Return request headers."""
|
||||
return {"User-Agent": "tcl"}
|
||||
|
||||
def get_params(self):
|
||||
"""Return request parameters."""
|
||||
params = OrderedDict()
|
||||
params.update(credentials.get_creds2())
|
||||
params["address"] = bytes(self.file_uri, "utf-8")
|
||||
return params
|
||||
|
||||
def is_done(self, http_status: int, contents: str) -> bool:
|
||||
"""Handle request result."""
|
||||
# Expect "HTTP 206 Partial Content" response
|
||||
if http_status == 206:
|
||||
self.result = EncryptHeaderResult(contents)
|
||||
|
||||
+15
-5
@@ -1,13 +1,21 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import requests
|
||||
"""Base HTTP requests."""
|
||||
|
||||
from collections import OrderedDict
|
||||
|
||||
import requests
|
||||
|
||||
|
||||
class TimeoutException(Exception):
|
||||
"""Ignore timeouts."""
|
||||
pass
|
||||
|
||||
|
||||
class HttpRequest:
|
||||
"""Provides all generic features for making HTTP GET requests"""
|
||||
|
||||
def __init__(self, url, timeout=10):
|
||||
self.url = url
|
||||
self.params = OrderedDict()
|
||||
@@ -23,16 +31,18 @@ class HttpRequest:
|
||||
"""Run query."""
|
||||
try:
|
||||
req = self.sess.get(self.url, params=self.params, timeout=self.timeout)
|
||||
except requests.exceptions.Timeout as e:
|
||||
raise TimeoutException(e)
|
||||
except requests.exceptions.Timeout as exc:
|
||||
raise TimeoutException(exc)
|
||||
return req
|
||||
|
||||
|
||||
class HttpPostRequest(HttpRequest):
|
||||
"""Provides all generic features for making HTTP POST requests"""
|
||||
|
||||
def run(self):
|
||||
"""Run query."""
|
||||
try:
|
||||
req = self.sess.post(self.url, data=self.params, timeout=self.timeout)
|
||||
except requests.exceptions.Timeout as e:
|
||||
raise TimeoutException(e)
|
||||
except requests.exceptions.Timeout as exc:
|
||||
raise TimeoutException(exc)
|
||||
return req
|
||||
|
||||
@@ -1,20 +1,28 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""Generic request executors."""
|
||||
|
||||
from . import http, serverselector
|
||||
from .tclrequest import TclRequest
|
||||
from . import http
|
||||
from . import serverselector
|
||||
|
||||
|
||||
class UnknownMethodException(Exception):
|
||||
"""Ignore unknown methods."""
|
||||
pass
|
||||
|
||||
|
||||
class RequestRunner:
|
||||
"""Generic request executor."""
|
||||
|
||||
def __init__(self, server_selector: serverselector.ServerSelector, https=True):
|
||||
"""Populate variables."""
|
||||
self.server_selector = server_selector
|
||||
self.protocol = "https://" if https else "http://"
|
||||
self.max_tries = 5
|
||||
|
||||
def get_http(self, method="GET") -> http.HttpRequest:
|
||||
"""Returns the http class according to desired method."""
|
||||
"""Return the http class according to desired method."""
|
||||
if method == "GET":
|
||||
return http.HttpRequest
|
||||
elif method == "POST":
|
||||
@@ -22,11 +30,11 @@ class RequestRunner:
|
||||
raise UnknownMethodException("Unknown http method: {}".format(method))
|
||||
|
||||
def get_server(self) -> str:
|
||||
"""Returns a master server."""
|
||||
"""Return a master server."""
|
||||
return self.server_selector.get_master_server()
|
||||
|
||||
def run(self, query: TclRequest, timeout: int=10) -> bool:
|
||||
"""Runs the actual query."""
|
||||
"""Run the actual query."""
|
||||
for _ in range(0, self.max_tries):
|
||||
url = "{}{}{}".format(self.protocol, self.get_server(), query.uri)
|
||||
http_handler = self.get_http(query.method)(url, timeout)
|
||||
|
||||
@@ -1,12 +1,14 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# pylint: disable=C0111,C0326,C0103
|
||||
|
||||
"""Tools to sort API servers to find the least awful one."""
|
||||
|
||||
import numpy
|
||||
import time
|
||||
|
||||
import numpy
|
||||
|
||||
|
||||
MASTER_SERVERS = [
|
||||
"g2master-us-east.tclclouds.com",
|
||||
@@ -17,6 +19,7 @@ MASTER_SERVERS = [
|
||||
"g2master-sa-east.tclclouds.com",
|
||||
]
|
||||
|
||||
|
||||
class ServerSelector:
|
||||
"""Returns a random server to use."""
|
||||
|
||||
@@ -45,6 +48,7 @@ class ServerSelector:
|
||||
"""Hook to be called after request finished"""
|
||||
pass
|
||||
|
||||
|
||||
class ServerVoteSelector(ServerSelector):
|
||||
"""Tries to return faster servers more often."""
|
||||
|
||||
|
||||
@@ -1,9 +1,16 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""Generic TCL request object."""
|
||||
|
||||
from . import tclresult
|
||||
|
||||
|
||||
class TclRequest:
|
||||
"""Generic TCL request object."""
|
||||
|
||||
def __init__(self):
|
||||
"""Populate variables."""
|
||||
self.uri = ""
|
||||
self.rawmode = False
|
||||
self.response = None
|
||||
@@ -12,15 +19,17 @@ class TclRequest:
|
||||
self.success = False
|
||||
|
||||
def get_headers(self):
|
||||
"""Return request headers."""
|
||||
return {}
|
||||
|
||||
def get_params(self):
|
||||
"""Return request parameters."""
|
||||
return {}
|
||||
|
||||
def is_done(self, http_status: int, contents: str):
|
||||
"""Checks if query is done or needs retry."""
|
||||
"""Check if query is done or needs retry."""
|
||||
return False
|
||||
|
||||
def get_result(self) -> tclresult.TclResult:
|
||||
"""Returns Result object."""
|
||||
"""Return Result object."""
|
||||
return self.result
|
||||
|
||||
@@ -1,19 +1,26 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""Generic TCL API result handlers."""
|
||||
|
||||
import xml.dom.minidom
|
||||
|
||||
from defusedxml import ElementTree
|
||||
|
||||
from . import dumpmgr
|
||||
from .. import dumpmgr
|
||||
|
||||
|
||||
class TclResult:
|
||||
def __init__(self, xml: str):
|
||||
self.raw_xml = xml
|
||||
"""Generic TCL API result."""
|
||||
|
||||
def __init__(self, xmlstr: str):
|
||||
"""Populate variables."""
|
||||
self.raw_xml = xmlstr
|
||||
self.dumper = dumpmgr.DumpMgr()
|
||||
self.dumper.write_dump(xml)
|
||||
self.dumper.write_dump(xmlstr)
|
||||
|
||||
def delete_dump(self):
|
||||
"""Delete last dump."""
|
||||
self.dumper.delete_last_dump()
|
||||
|
||||
def pretty_xml(self):
|
||||
@@ -21,10 +28,14 @@ class TclResult:
|
||||
mdx = xml.dom.minidom.parseString(self.raw_xml)
|
||||
return mdx.toprettyxml(indent=" ")
|
||||
|
||||
|
||||
class CheckResult(TclResult):
|
||||
def __init__(self, xml: str):
|
||||
super().__init__(xml)
|
||||
root = ElementTree.fromstring(xml)
|
||||
"""Handle check request result."""
|
||||
|
||||
def __init__(self, xmlstr: str):
|
||||
"""Extract data from check request result."""
|
||||
super().__init__(xmlstr)
|
||||
root = ElementTree.fromstring(xmlstr)
|
||||
self.curef = root.find("CUREF").text
|
||||
self.fvver = root.find("VERSION").find("FV").text
|
||||
self.tvver = root.find("VERSION").find("TV").text
|
||||
@@ -35,10 +46,14 @@ class CheckResult(TclResult):
|
||||
self.filesize = fileinfo.find("SIZE").text
|
||||
self.filehash = fileinfo.find("CHECKSUM").text
|
||||
|
||||
|
||||
class DownloadResult(TclResult):
|
||||
def __init__(self, xml: str):
|
||||
super().__init__(xml)
|
||||
root = ElementTree.fromstring(xml)
|
||||
"""Handle download request result."""
|
||||
|
||||
def __init__(self, xmlstr: str):
|
||||
"""Extract data from download request result."""
|
||||
super().__init__(xmlstr)
|
||||
root = ElementTree.fromstring(xmlstr)
|
||||
file = root.find("FILE_LIST").find("FILE")
|
||||
self.fileid = file.find("FILE_ID").text
|
||||
self.fileurl = file.find("DOWNLOAD_URL").text
|
||||
@@ -53,16 +68,24 @@ class DownloadResult(TclResult):
|
||||
self.encslaves = [s.text for s in enc_list]
|
||||
self.s3_slaves = [s.text for s in s3_slave_list]
|
||||
|
||||
|
||||
class ChecksumResult(TclResult):
|
||||
def __init__(self, xml: str):
|
||||
super().__init__(xml)
|
||||
root = ElementTree.fromstring(xml)
|
||||
"""Handle checksum request result."""
|
||||
|
||||
def __init__(self, xmlstr: str):
|
||||
"""Extract data from checksum request result."""
|
||||
super().__init__(xmlstr)
|
||||
root = ElementTree.fromstring(xmlstr)
|
||||
file = root.find("FILE_CHECKSUM_LIST").find("FILE")
|
||||
self.file_addr = file.find("ADDRESS").text
|
||||
self.sha1_enc_footer = file.find("ENCRYPT_FOOTER").text
|
||||
self.sha1_footer = file.find("FOOTER").text
|
||||
self.sha1_body = file.find("BODY").text
|
||||
|
||||
|
||||
class EncryptHeaderResult(TclResult):
|
||||
"""Handle encrypted header request result."""
|
||||
|
||||
def __init__(self, contents: str):
|
||||
"""Extract data from encrypted header request result."""
|
||||
self.rawdata = contents
|
||||
|
||||
@@ -1,66 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# pylint: disable=C0111,C0326,C0103
|
||||
|
||||
"""Tools to sort API servers to find the least awful one."""
|
||||
|
||||
import numpy
|
||||
|
||||
|
||||
class ServerVoteMixin:
|
||||
"""A mixin component for server sorting."""
|
||||
|
||||
def __init__(self):
|
||||
"""Populate server list and weighting variables."""
|
||||
self.g2master = None
|
||||
self.master_servers = [
|
||||
"g2master-us-east.tclclouds.com",
|
||||
"g2master-us-west.tclclouds.com",
|
||||
"g2master-eu-west.tclclouds.com",
|
||||
"g2master-ap-south.tclclouds.com",
|
||||
"g2master-ap-north.tclclouds.com",
|
||||
"g2master-sa-east.tclclouds.com",
|
||||
]
|
||||
self.master_servers_weights = [3] * len(self.master_servers)
|
||||
self.check_time_sum = 3
|
||||
self.check_time_count = 1
|
||||
|
||||
def get_master_server(self):
|
||||
"""Return weighted choice from server list."""
|
||||
weight_sum = 0
|
||||
for i in self.master_servers_weights:
|
||||
weight_sum += i
|
||||
numpy_weights = []
|
||||
for i in self.master_servers_weights:
|
||||
numpy_weights.append(i/weight_sum)
|
||||
return numpy.random.choice(self.master_servers, p=numpy_weights)
|
||||
|
||||
def master_server_downvote(self):
|
||||
"""Decrease weight of a server."""
|
||||
idx = self.master_servers.index(self.g2master)
|
||||
if self.master_servers_weights[idx] > 1:
|
||||
self.master_servers_weights[idx] -= 1
|
||||
|
||||
def master_server_upvote(self):
|
||||
"""Increase weight of a server."""
|
||||
idx = self.master_servers.index(self.g2master)
|
||||
if self.master_servers_weights[idx] < 10:
|
||||
self.master_servers_weights[idx] += 1
|
||||
|
||||
def check_time_add(self, duration):
|
||||
"""Record connection time."""
|
||||
self.check_time_sum += duration
|
||||
self.check_time_count += 1
|
||||
|
||||
def check_time_avg(self):
|
||||
"""Return average connection time."""
|
||||
return self.check_time_sum / self.check_time_count
|
||||
|
||||
def master_server_vote_on_time(self, last_duration):
|
||||
"""Change weight of a server based on average connection time."""
|
||||
avg_duration = self.check_time_avg()
|
||||
if last_duration < avg_duration - 0.5:
|
||||
self.master_server_upvote()
|
||||
elif last_duration > avg_duration + 0.5:
|
||||
self.master_server_downvote()
|
||||
@@ -1,94 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# pylint: disable=C0111,C0326,C0103
|
||||
|
||||
"""Tools to interface with TCL's update request API."""
|
||||
|
||||
import time
|
||||
from collections import OrderedDict, defaultdict
|
||||
|
||||
import requests
|
||||
from defusedxml import ElementTree
|
||||
|
||||
from .devices import Device
|
||||
|
||||
|
||||
class TclCheckMixin:
|
||||
"""A mixin component for TCL's update request API."""
|
||||
|
||||
def prep_check_url(self, https=True):
|
||||
"""Prepare URL for update request."""
|
||||
protocol = "https://" if https else "http://"
|
||||
url = protocol + self.g2master + "/check.php"
|
||||
return url
|
||||
|
||||
def prep_check(self, device: Device, https=True):
|
||||
"""Prepare URL and parameters for update request."""
|
||||
url = self.prep_check_url(https)
|
||||
params = OrderedDict()
|
||||
params["id"] = device.imei
|
||||
params["curef"] = device.curef
|
||||
params["fv"] = device.fwver
|
||||
params["mode"] = device.mode
|
||||
params["type"] = device.type
|
||||
params["cltp"] = device.cltp
|
||||
params["cktp"] = device.cktp
|
||||
params["rtd"] = device.rtd
|
||||
params["chnl"] = device.chnl
|
||||
#params["osvs"] = device.osvs
|
||||
#params["ckot"] = device.ckot
|
||||
return url, params
|
||||
|
||||
def do_check(self, device: Device, https=True, timeout=10, max_tries=5):
|
||||
"""Perform update request with given parameters."""
|
||||
url, params = self.prep_check(device, https)
|
||||
last_response = None
|
||||
for _ in range(0, max_tries):
|
||||
try:
|
||||
reqtime_start = time.perf_counter()
|
||||
req = self.sess.get(url, params=params, timeout=timeout)
|
||||
reqtime = time.perf_counter() - reqtime_start
|
||||
self.check_time_add(reqtime)
|
||||
last_response = req
|
||||
if req.status_code == 200:
|
||||
self.master_server_vote_on_time(reqtime)
|
||||
req.encoding = "utf-8" # Force encoding as server doesn't give one
|
||||
self.write_dump(req.text)
|
||||
return req.text
|
||||
elif req.status_code not in [500, 502, 503]:
|
||||
self.do_check_errorhandle(req, reqtime)
|
||||
except requests.exceptions.Timeout:
|
||||
pass
|
||||
# Something went wrong, try a different server
|
||||
self.master_server_downvote()
|
||||
self.g2master = self.get_master_server()
|
||||
url = self.prep_check_url(https)
|
||||
raise requests.exceptions.RetryError("Max tries ({}) reached.".format(max_tries), response=last_response)
|
||||
|
||||
def do_check_errorhandle(self, req, reqtime):
|
||||
"""Handle non-HTTP 200 results for ``do_check``."""
|
||||
errcodes = defaultdict(lambda: "HTTP {}.".format(req.status_code))
|
||||
errcodes[204] = "No update available."
|
||||
errcodes[404] = "No data for requested CUREF/FV combination."
|
||||
if req.status_code in [204, 404]:
|
||||
self.master_server_vote_on_time(reqtime)
|
||||
elif req.status_code not in [500, 502, 503]:
|
||||
self.master_server_downvote()
|
||||
req.raise_for_status()
|
||||
raise requests.exceptions.HTTPError(errcodes[req.status_code], response=req)
|
||||
|
||||
@staticmethod
|
||||
def parse_check(xmlstr):
|
||||
"""Parse output of ``do_check``."""
|
||||
root = ElementTree.fromstring(xmlstr)
|
||||
curef = root.find("CUREF").text
|
||||
fvver = root.find("VERSION").find("FV").text
|
||||
tvver = root.find("VERSION").find("TV").text
|
||||
fw_id = root.find("FIRMWARE").find("FW_ID").text
|
||||
fileinfo = root.find("FIRMWARE").find("FILESET").find("FILE")
|
||||
fileid = fileinfo.find("FILE_ID").text
|
||||
filename = fileinfo.find("FILENAME").text
|
||||
filesize = fileinfo.find("SIZE").text
|
||||
filehash = fileinfo.find("CHECKSUM").text
|
||||
return curef, fvver, tvver, fw_id, fileid, filename, filesize, filehash
|
||||
@@ -1,56 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# pylint: disable=C0111,C0326,C0103
|
||||
|
||||
"""Tools to interface with TCL's checksum API."""
|
||||
|
||||
import json
|
||||
|
||||
from defusedxml import ElementTree
|
||||
|
||||
from . import credentials
|
||||
|
||||
|
||||
class TclChecksumMixin:
|
||||
"""A mixin component for TCL's checksum API."""
|
||||
|
||||
@staticmethod
|
||||
def prep_checksum(encslave, address, uri):
|
||||
"""Prepare URL and parameters for checksum request."""
|
||||
url = "http://" + encslave + "/checksum.php"
|
||||
params = credentials.get_creds2()
|
||||
payload = {address: uri}
|
||||
payload_json = json.dumps(payload)
|
||||
params[b"address"] = bytes(payload_json, "utf-8")
|
||||
return url, params
|
||||
|
||||
def do_checksum(self, encslave, address, uri):
|
||||
"""Perform checksum request with given parameters."""
|
||||
url, params = self.prep_checksum(encslave, address, uri)
|
||||
# print(repr(dict(params)))
|
||||
req = self.sess.post(url, data=params)
|
||||
if req.status_code == 200:
|
||||
req.encoding = "utf-8" # Force encoding as server doesn't give one
|
||||
self.write_dump(req.text)
|
||||
# <ENCRYPT_FOOTER>2abfa6f6507044fec995efede5d818e62a0b19b5</ENCRYPT_FOOTER> means ERROR (invalid ADDRESS!)
|
||||
if "<ENCRYPT_FOOTER>2abfa6f6507044fec995efede5d818e62a0b19b5</ENCRYPT_FOOTER>" in req.text:
|
||||
print("INVALID URI: {}".format(uri))
|
||||
raise SystemExit
|
||||
return req.text
|
||||
else:
|
||||
print("CHECKSUM: " + repr(req))
|
||||
print(repr(req.headers))
|
||||
print(repr(req.text))
|
||||
raise SystemExit
|
||||
|
||||
@staticmethod
|
||||
def parse_checksum(xmlstr):
|
||||
"""Parse output of ``do_checksum``."""
|
||||
root = ElementTree.fromstring(xmlstr)
|
||||
file = root.find("FILE_CHECKSUM_LIST").find("FILE")
|
||||
file_addr = file.find("ADDRESS").text
|
||||
sha1_enc_footer = file.find("ENCRYPT_FOOTER").text
|
||||
sha1_footer = file.find("FOOTER").text
|
||||
sha1_body = file.find("BODY").text
|
||||
return file_addr, sha1_body, sha1_enc_footer, sha1_footer
|
||||
@@ -1,27 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# pylint: disable=C0111,C0326,C0103
|
||||
|
||||
"""Tools to interface with TCL's encrypted header API."""
|
||||
|
||||
from . import credentials
|
||||
|
||||
|
||||
class TclEncHeaderMixin:
|
||||
"""A mixin component for TCL's encrypted header API.."""
|
||||
|
||||
def do_encrypt_header(self, encslave, address):
|
||||
"""Perform encrypted header request with given parameters."""
|
||||
params = credentials.get_creds2()
|
||||
params[b"address"] = bytes(address, "utf-8")
|
||||
url = "http://" + encslave + "/encrypt_header.php"
|
||||
req = self.sess.post(url, data=params, verify=False)
|
||||
# Expect "HTTP 206 Partial Content" response
|
||||
if req.status_code == 206:
|
||||
return req.content
|
||||
else:
|
||||
print("ENCRYPT: " + repr(req))
|
||||
print(repr(req.headers))
|
||||
print(repr(req.text))
|
||||
raise SystemExit
|
||||
@@ -1,123 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# pylint: disable=C0111,C0326,C0103
|
||||
|
||||
"""Tools to interface with TCL's download request API."""
|
||||
|
||||
import binascii
|
||||
import hashlib
|
||||
import random
|
||||
import time
|
||||
import zlib
|
||||
from collections import OrderedDict
|
||||
from math import floor
|
||||
|
||||
from defusedxml import ElementTree
|
||||
|
||||
|
||||
'''
|
||||
private HashMap<String, String> buildDownloadUrisParams(UpdatePackageInfo updatePackageInfo) {
|
||||
FotaLog.m28v(TAG, "doAfterCheck");
|
||||
String salt = FotaUtil.salt();
|
||||
HashMap linkedHashMap = new LinkedHashMap();
|
||||
linkedHashMap.put("id", this.internalBuilder.getParam("id"));
|
||||
linkedHashMap.put("salt", salt);
|
||||
linkedHashMap.put("curef", updatePackageInfo.mCuref);
|
||||
linkedHashMap.put("fv", updatePackageInfo.mFv);
|
||||
linkedHashMap.put("tv", updatePackageInfo.mTv);
|
||||
linkedHashMap.put("type", "Firmware");
|
||||
linkedHashMap.put("fw_id", updatePackageInfo.mFirmwareId);
|
||||
linkedHashMap.put("mode", "2");
|
||||
linkedHashMap.put("vk", generateVk2((LinkedHashMap) linkedHashMap.clone()));
|
||||
linkedHashMap.put("cltp", "10");
|
||||
linkedHashMap.put("cktp", this.internalBuilder.getParam("cktp"));
|
||||
linkedHashMap.put("rtd", this.internalBuilder.getParam("rtd"));
|
||||
linkedHashMap.put("chnl", this.internalBuilder.getParam("chnl"));
|
||||
return linkedHashMap;
|
||||
}
|
||||
'''
|
||||
|
||||
VDKEY_B64Z = b"eJwdjwEOwDAIAr8kKFr//7HhmqXp8AIIDrYAgg8byiUXrwRJRXja+d6iNxu0AhUooDCN9rd6rDLxmGIakUVWo3IGCTRWqCAt6X4jGEIUAxgN0eYWnp+LkpHQAg/PsO90ELsy0Npm/n2HbtPndFgGEV31R9OmT4O4nrddjc3Qt6nWscx7e+WRHq5UnOudtjw5skuV09pFhvmqnOEIs4ljPeel1wfLYUF4\n"
|
||||
|
||||
|
||||
def get_salt():
|
||||
"""Generate cryptographic salt."""
|
||||
millis = floor(time.time() * 1000)
|
||||
tail = "{:06d}".format(random.randint(0, 999999))
|
||||
return "{}{}".format(str(millis), tail)
|
||||
|
||||
|
||||
def get_vk2(params_dict, cltp):
|
||||
"""Generate salted hash of API parameters."""
|
||||
params_dict["cltp"] = cltp
|
||||
query = ""
|
||||
for key, val in params_dict.items():
|
||||
if query:
|
||||
query += "&"
|
||||
query += key + "=" + str(val)
|
||||
vdk = zlib.decompress(binascii.a2b_base64(VDKEY_B64Z))
|
||||
query += vdk.decode("utf-8")
|
||||
engine = hashlib.sha1()
|
||||
engine.update(bytes(query, "utf-8"))
|
||||
hexhash = engine.hexdigest()
|
||||
return hexhash
|
||||
|
||||
|
||||
class TclRequestMixin:
|
||||
"""A mixin component for TCL's download request API."""
|
||||
|
||||
def prep_request(self, curef, fvver, tvver, fw_id):
|
||||
"""Prepare URL and device parameters for download request."""
|
||||
url = "https://" + self.g2master + "/download_request.php"
|
||||
params = OrderedDict()
|
||||
params["id"] = self.serid
|
||||
params["salt"] = get_salt()
|
||||
params["curef"] = curef
|
||||
params["fv"] = fvver
|
||||
params["tv"] = tvver
|
||||
params["type"] = self.ftype
|
||||
params["fw_id"] = fw_id
|
||||
params["mode"] = self.mode.value
|
||||
params["vk"] = get_vk2(params, self.cltp.value)
|
||||
params["cltp"] = self.cltp.value
|
||||
params["cktp"] = self.cktp.value
|
||||
params["rtd"] = self.rtd.value
|
||||
if self.mode == self.MODE.FULL:
|
||||
params["foot"] = 1
|
||||
params["chnl"] = self.chnl.value
|
||||
return url, params
|
||||
|
||||
def do_request(self, curef, fvver, tvver, fw_id):
|
||||
"""Perform download request with given parameters."""
|
||||
url, params = self.prep_request(curef, fvver, tvver, fw_id)
|
||||
# print(repr(dict(params)))
|
||||
req = self.sess.post(url, data=params)
|
||||
if req.status_code == 200:
|
||||
req.encoding = "utf-8" # Force encoding as server doesn't give one
|
||||
self.write_dump(req.text)
|
||||
return req.text
|
||||
else:
|
||||
print("REQUEST: " + repr(req))
|
||||
print(repr(req.headers))
|
||||
print(repr(req.text))
|
||||
raise SystemExit
|
||||
|
||||
@staticmethod
|
||||
def parse_request(xmlstr):
|
||||
"""Parse output of ``do_request``."""
|
||||
root = ElementTree.fromstring(xmlstr)
|
||||
file = root.find("FILE_LIST").find("FILE")
|
||||
fileid = file.find("FILE_ID").text
|
||||
fileurl = file.find("DOWNLOAD_URL").text
|
||||
s3_fileurl_node = file.find("S3_DOWNLOAD_URL")
|
||||
s3_fileurl = ""
|
||||
if s3_fileurl_node:
|
||||
s3_fileurl = s3_fileurl_node.text
|
||||
slave_list = root.find("SLAVE_LIST").findall("SLAVE")
|
||||
enc_list = root.find("SLAVE_LIST").findall("ENCRYPT_SLAVE")
|
||||
s3_slave_list = root.find("SLAVE_LIST").findall("S3_SLAVE")
|
||||
slaves = [s.text for s in slave_list]
|
||||
encslaves = [s.text for s in enc_list]
|
||||
s3_slaves = [s.text for s in s3_slave_list]
|
||||
return fileid, fileurl, slaves, encslaves, s3_fileurl, s3_slaves
|
||||
@@ -1,14 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# pylint: disable=C0111,C0326,C0103
|
||||
|
||||
"""XML tools."""
|
||||
|
||||
import xml.dom.minidom
|
||||
|
||||
|
||||
def pretty_xml(xmlstr):
|
||||
"""Prettify input XML with ``xml.dom.minidom``."""
|
||||
mdx = xml.dom.minidom.parseString(xmlstr)
|
||||
return mdx.toprettyxml(indent=" ")
|
||||
Executable
+22
@@ -0,0 +1,22 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# pylint: disable=C0111,C0326,C0103
|
||||
|
||||
"""Update PRD database."""
|
||||
|
||||
import sys
|
||||
|
||||
from tcllib import argparser, devlist
|
||||
|
||||
|
||||
dpdesc = """
|
||||
Updates PRD software database if local copy is outdated.
|
||||
"""
|
||||
dp = argparser.DefaultParser(__file__, dpdesc)
|
||||
dp.add_argument("-f", "--force", help="force database update", dest="force", action="store_true", default=False)
|
||||
args = dp.parse_args(sys.argv[1:])
|
||||
|
||||
print("Updating device database...")
|
||||
prds = devlist.get_devicelist(force=args.force)
|
||||
del prds
|
||||
Regular → Executable
+9
@@ -9,9 +9,18 @@
|
||||
|
||||
import glob
|
||||
import os
|
||||
import sys
|
||||
|
||||
import requests
|
||||
|
||||
from tcllib import argparser
|
||||
dpdesc = """
|
||||
Uploads contents of logs folder to remote database.
|
||||
"""
|
||||
dp = argparser.DefaultParser(__file__, dpdesc)
|
||||
args = dp.parse_args(sys.argv[1:])
|
||||
del args
|
||||
|
||||
|
||||
# This is the URL to an installation of https://github.com/mbirth/tcl_update_db
|
||||
UPLOAD_URL = "https://tclota.birth-online.de/"
|
||||
|
||||
Reference in New Issue
Block a user