mirror of
https://github.com/mbirth/tcl_ota_check.git
synced 2024-11-09 22:06:47 +00:00
pylint, autopep8, isort runthroughs
This commit is contained in:
parent
42566d1195
commit
da3663486a
11
tclcheck.py
11
tclcheck.py
@ -6,12 +6,14 @@
|
||||
import os
|
||||
import random
|
||||
import sys
|
||||
|
||||
import tcllib
|
||||
import tcllib.argparser
|
||||
|
||||
|
||||
fc = tcllib.FotaCheck()
|
||||
fc.serid = "3531510"
|
||||
#fc.osvs = "7.1.1"
|
||||
#fc.osvs = "7.1.1"
|
||||
|
||||
dpdesc = """
|
||||
Checks for the latest FULL updates for the specified PRD number or for an OTA from the
|
||||
@ -21,12 +23,13 @@ dp = tcllib.argparser.DefaultParser(__file__, dpdesc)
|
||||
dp.add_argument("prd", nargs=1, help="CU Reference #, e.g. PRD-63117-011")
|
||||
dp.add_argument("fvver", nargs="?", help="Firmware version to check for OTA updates, e.g. AAM481 (omit to run FULL check)", default="AAA000")
|
||||
dp.add_argument("-i", "--imei", help="use specified IMEI instead of default", type=str)
|
||||
dp.add_argument("-m", "--mode", help="force type of update to check for", default="auto", type=str ,choices=["full", "ota"])
|
||||
dp.add_argument("-m", "--mode", help="force type of update to check for", default="auto", type=str, choices=["full", "ota"])
|
||||
dp.add_argument("-t", "--type", help="force type of check to run", default="auto", type=str, choices=["desktop", "mobile"])
|
||||
dp.add_argument("--rawmode", help="override --mode with raw value (2=OTA, 4=FULL)", metavar="MODE")
|
||||
dp.add_argument("--rawcltp", help="override --type with raw value (10=MOBILE, 2010=DESKTOP)", metavar="CLTP")
|
||||
args = dp.parse_args(sys.argv[1:])
|
||||
|
||||
|
||||
def sel_mode(txtmode, autoval, rawval):
|
||||
if rawval:
|
||||
enum = tcllib.default_enum("MODE", {"RAW": rawval})
|
||||
@ -37,6 +40,7 @@ def sel_mode(txtmode, autoval, rawval):
|
||||
return fc.MODE.OTA
|
||||
return fc.MODE.FULL
|
||||
|
||||
|
||||
def sel_cltp(txtmode, autoval, rawval):
|
||||
if rawval:
|
||||
enum = tcllib.default_enum("CLTP", {"RAW": rawval})
|
||||
@ -47,6 +51,7 @@ def sel_cltp(txtmode, autoval, rawval):
|
||||
return fc.CLTP.DESKTOP
|
||||
return fc.CLTP.MOBILE
|
||||
|
||||
|
||||
if args.imei:
|
||||
print("Use specified IMEI: {}".format(args.imei))
|
||||
fc.serid = args.imei
|
||||
@ -71,7 +76,7 @@ req_xml = fc.do_request(curef, fv, tv, fw_id)
|
||||
print(fc.pretty_xml(req_xml))
|
||||
fileid, fileurl, slaves, encslaves, s3_fileurl, s3_slaves = fc.parse_request(req_xml)
|
||||
|
||||
if len(encslaves) > 0:
|
||||
if encslaves:
|
||||
chksum_xml = fc.do_checksum(random.choice(encslaves), fileurl, fileurl)
|
||||
print(fc.pretty_xml(chksum_xml))
|
||||
file_addr, sha1_body, sha1_enc_footer, sha1_footer = fc.parse_checksum(chksum_xml)
|
||||
|
@ -4,10 +4,13 @@
|
||||
# pylint: disable=C0111,C0326,C0103
|
||||
|
||||
import sys
|
||||
|
||||
from requests.exceptions import RequestException
|
||||
|
||||
import tcllib
|
||||
import tcllib.argparser
|
||||
from tcllib import ansi
|
||||
from requests.exceptions import RequestException, Timeout
|
||||
|
||||
|
||||
fc = tcllib.FotaCheck()
|
||||
fc.serid = "3531510"
|
||||
@ -23,8 +26,8 @@ dp.add_argument("-p", "--prd", help="CU Reference # to filter scan results", des
|
||||
args = dp.parse_args(sys.argv[1:])
|
||||
|
||||
# CLTP = 10 (only show actual updates or HTTP 206) / 2010 (always show latest version for MODE.FULL)
|
||||
#fc.cltp = fc.CLTP.MOBILE
|
||||
fc.cltp = fc.CLTP.DESKTOP
|
||||
#fc.cltp = fc.CLTP.MOBILE
|
||||
fc.cltp = fc.CLTP.DESKTOP
|
||||
|
||||
prdcheck = "" if args.tocheck is None else args.tocheck
|
||||
|
||||
|
@ -3,19 +3,20 @@
|
||||
|
||||
# pylint: disable=C0111,C0326,C0103
|
||||
|
||||
import json
|
||||
import requests
|
||||
import sys
|
||||
|
||||
from requests.exceptions import RequestException
|
||||
|
||||
import tcllib
|
||||
import tcllib.argparser
|
||||
from tcllib import ansi
|
||||
from requests.exceptions import RequestException
|
||||
|
||||
|
||||
fc = tcllib.FotaCheck()
|
||||
fc.serid = "3531510"
|
||||
#fc.osvs = "7.1.1"
|
||||
#fc.osvs = "7.1.1"
|
||||
fc.mode = fc.MODE.OTA
|
||||
fc.cltp = fc.CLTP.MOBILE
|
||||
fc.cltp = fc.CLTP.MOBILE
|
||||
|
||||
dpdesc = """
|
||||
Checks for the latest OTA updates for all PRD numbers or only for the PRD specified
|
||||
@ -31,7 +32,7 @@ if args.forcever is not None:
|
||||
else:
|
||||
force_ver_text = ""
|
||||
|
||||
prdcheck = "" if args.tocheck is None else args.tocheck
|
||||
prdcheck = "" if args.tocheck is None else args.tocheck
|
||||
|
||||
print("Loading list of devices.")
|
||||
prds = tcllib.FotaCheck.get_devicelist()
|
||||
@ -39,9 +40,9 @@ prds = tcllib.FotaCheck.get_devicelist()
|
||||
print("List of latest OTA firmware{} by PRD:".format(force_ver_text))
|
||||
|
||||
for prd, variant in prds.items():
|
||||
model = variant["variant"]
|
||||
model = variant["variant"]
|
||||
lastver = variant["last_ota"]
|
||||
if lastver is None: lastver = variant["last_full"]
|
||||
lastver = variant["last_full"] if lastver is None else lastver
|
||||
if args.forcever is not None:
|
||||
lastver = args.forcever
|
||||
if prdcheck in prd:
|
||||
|
@ -5,10 +5,13 @@
|
||||
|
||||
import collections
|
||||
import sys
|
||||
|
||||
from requests.exceptions import RequestException, Timeout
|
||||
|
||||
import tcllib
|
||||
import tcllib.argparser
|
||||
from tcllib import ansi
|
||||
from requests.exceptions import RequestException, Timeout
|
||||
|
||||
|
||||
fc = tcllib.FotaCheck()
|
||||
fc.serid = "3531510"
|
||||
@ -16,8 +19,8 @@ fc.fv = "AAA000"
|
||||
fc.mode = fc.MODE.FULL
|
||||
|
||||
# CLTP = 10 (only show actual updates or HTTP 206) / 2010 (always show latest version for MODE.FULL)
|
||||
#fc.cltp = fc.CLTP.MOBILE
|
||||
fc.cltp = fc.CLTP.DESKTOP
|
||||
#fc.cltp = fc.CLTP.MOBILE
|
||||
fc.cltp = fc.CLTP.DESKTOP
|
||||
|
||||
dpdesc = """
|
||||
Finds new PRD numbers for all known variants, or specified variants with tocheck. Scan range
|
||||
|
@ -3,12 +3,14 @@
|
||||
|
||||
# pylint: disable=C0111,C0326,C0103
|
||||
|
||||
import collections
|
||||
import sys
|
||||
|
||||
from requests.exceptions import RequestException, Timeout
|
||||
|
||||
import tcllib
|
||||
import tcllib.argparser
|
||||
from tcllib import ansi
|
||||
from requests.exceptions import RequestException, Timeout
|
||||
|
||||
|
||||
# Variants to scan for
|
||||
SCAN_VARIANTS = ["001", "003", "009", "010", "700"]
|
||||
@ -19,8 +21,8 @@ fc.fv = "AAA000"
|
||||
fc.mode = fc.MODE.FULL
|
||||
|
||||
# CLTP = 10 (only show actual updates or HTTP 206) / 2010 (always show latest version for MODE.FULL)
|
||||
#fc.cltp = fc.CLTP.MOBILE
|
||||
fc.cltp = fc.CLTP.DESKTOP
|
||||
#fc.cltp = fc.CLTP.MOBILE
|
||||
fc.cltp = fc.CLTP.DESKTOP
|
||||
|
||||
dpdesc = """
|
||||
Finds new PRD numbers for a range of variants. Scan range can be set by
|
||||
|
@ -4,10 +4,13 @@
|
||||
# pylint: disable=C0111,C0326,C0103
|
||||
|
||||
import sys
|
||||
|
||||
from requests.exceptions import RequestException, Timeout
|
||||
|
||||
import tcllib
|
||||
import tcllib.argparser
|
||||
from tcllib import ansi
|
||||
from requests.exceptions import RequestException, Timeout
|
||||
|
||||
|
||||
fc = tcllib.FotaCheck()
|
||||
fc.serid = "3531510"
|
||||
|
@ -4,20 +4,22 @@
|
||||
# pylint: disable=C0111,C0326,C0103
|
||||
|
||||
import json
|
||||
|
||||
import requests
|
||||
import sys
|
||||
import tcllib
|
||||
from requests.exceptions import RequestException
|
||||
|
||||
import tcllib
|
||||
|
||||
|
||||
# 1. Fetch list of missing OTAs (e.g. from ancient versions to current)
|
||||
# 2. Query updates from FOTA servers (and store XML)
|
||||
# (3. Upload will be done manually with upload_logs.py)
|
||||
|
||||
fc = tcllib.FotaCheck()
|
||||
fc.serid = "3531510"
|
||||
#fc.osvs = "7.1.1"
|
||||
#fc.osvs = "7.1.1"
|
||||
fc.mode = fc.MODE.OTA
|
||||
fc.cltp = fc.CLTP.MOBILE
|
||||
fc.cltp = fc.CLTP.MOBILE
|
||||
|
||||
print("Loading list of missing OTAs.")
|
||||
versions_json = requests.get("https://tclota.birth-online.de/json_otaversions.php").text
|
||||
|
@ -3,12 +3,13 @@
|
||||
|
||||
# pylint: disable=C0111,C0326,C0103
|
||||
|
||||
import os
|
||||
import random
|
||||
import sys
|
||||
|
||||
import tcllib
|
||||
import tcllib.argparser
|
||||
|
||||
|
||||
fc = tcllib.FotaCheck()
|
||||
|
||||
encslaves = [
|
||||
|
11
tcldown.py
11
tcldown.py
@ -6,12 +6,14 @@
|
||||
import os
|
||||
import random
|
||||
import sys
|
||||
|
||||
import tcllib
|
||||
import tcllib.argparser
|
||||
|
||||
|
||||
fc = tcllib.FotaCheck()
|
||||
fc.serid = "3531510"
|
||||
#fc.osvs = "7.1.1"
|
||||
#fc.osvs = "7.1.1"
|
||||
|
||||
dpdesc = """
|
||||
Downloads the given firmware file.
|
||||
@ -27,12 +29,14 @@ dp.add_argument("--rawmode", help="override --mode with raw value (2=OTA, 4=FULL
|
||||
dp.add_argument("--rawcltp", help="override --type with raw value (10=MOBILE, 2010=DESKTOP)", metavar="CLTP")
|
||||
args = dp.parse_args(sys.argv[1:])
|
||||
|
||||
|
||||
def sel_mode(defaultmode, rawval):
|
||||
if rawval:
|
||||
enum = tcllib.default_enum("MODE", {"RAW": rawval})
|
||||
return enum.RAW
|
||||
return defaultmode
|
||||
|
||||
|
||||
def sel_cltp(txtmode, rawval):
|
||||
if rawval:
|
||||
enum = tcllib.default_enum("CLTP", {"RAW": rawval})
|
||||
@ -41,16 +45,17 @@ def sel_cltp(txtmode, rawval):
|
||||
return fc.CLTP.MOBILE
|
||||
return fc.CLTP.DESKTOP
|
||||
|
||||
|
||||
if args.imei:
|
||||
print("Use specified IMEI: {}".format(args.imei))
|
||||
fc.serid = args.imei
|
||||
|
||||
fc.curef = args.prd[0]
|
||||
if args.ota:
|
||||
fc.fv = args.ota[0]
|
||||
fc.fv = args.ota[0]
|
||||
fc.mode = sel_mode(fc.MODE.OTA, args.rawmode)
|
||||
else:
|
||||
fc.fv = args.targetversion[0]
|
||||
fc.fv = args.targetversion[0]
|
||||
fc.mode = sel_mode(fc.MODE.FULL, args.rawmode)
|
||||
fc.cltp = sel_cltp(args.type, args.rawcltp)
|
||||
|
||||
|
@ -3,19 +3,16 @@
|
||||
# pylint: disable=C0111,C0326
|
||||
|
||||
import enum
|
||||
import requests
|
||||
from . import credentials
|
||||
from . import devlist
|
||||
from . import dumpmgr
|
||||
from . import servervote
|
||||
from . import tclcheck
|
||||
from . import tclrequest
|
||||
from . import tclchecksum
|
||||
from . import tclencheader
|
||||
from . import xmltools
|
||||
|
||||
def default_enum(enumname, vardict):
|
||||
return enum.IntEnum(enumname, vardict, module=__name__, qualname="tcllib.FotaCheck.{}".format(enumname))
|
||||
import requests
|
||||
|
||||
from . import (credentials, devlist, dumpmgr, servervote, tclcheck,
|
||||
tclchecksum, tclencheader, tclrequest, xmltools)
|
||||
|
||||
|
||||
def default_enum(enumname, vardict, qualroot="tcllib.FotaCheck"):
|
||||
return enum.IntEnum(enumname, vardict, module=__name__, qualname="{}.{}".format(qualroot, enumname))
|
||||
|
||||
|
||||
class FotaCheck(
|
||||
tclcheck.TclCheckMixin,
|
||||
@ -27,7 +24,7 @@ class FotaCheck(
|
||||
devlist.DevListMixin,
|
||||
dumpmgr.DumpMgrMixin,
|
||||
xmltools.XmlToolsMixin
|
||||
):
|
||||
):
|
||||
VDKEY = b"eJwdjwEOwDAIAr8kKFr//7HhmqXp8AIIDrYAgg8byiUXrwRJRXja+d6iNxu0AhUooDCN9rd6rDLxmGIakUVWo3IGCTRWqCAt6X4jGEIUAxgN0eYWnp+LkpHQAg/PsO90ELsy0Npm/n2HbtPndFgGEV31R9OmT4O4nrddjc3Qt6nWscx7e+WRHq5UnOudtjw5skuV09pFhvmqnOEIs4ljPeel1wfLYUF4\n"
|
||||
CKTP = default_enum("CKTP", ["AUTO", "MANUAL"])
|
||||
MODE = default_enum("MODE", {"OTA": 2, "FULL": 4})
|
||||
@ -40,15 +37,15 @@ class FotaCheck(
|
||||
super().__init__()
|
||||
self.serid = "543212345000000"
|
||||
self.curef = "PRD-63117-011"
|
||||
self.fv = "AAM481"
|
||||
self.osvs = "7.1.1"
|
||||
self.mode = self.MODE.FULL
|
||||
self.fv = "AAM481"
|
||||
self.osvs = "7.1.1"
|
||||
self.mode = self.MODE.FULL
|
||||
self.ftype = "Firmware"
|
||||
self.cltp = self.CLTP.MOBILE
|
||||
self.cktp = self.CKTP.MANUAL
|
||||
self.ckot = self.CKOT.ALL
|
||||
self.rtd = self.RTD.UNROOTED
|
||||
self.chnl = self.CHNL.WIFI
|
||||
self.cltp = self.CLTP.MOBILE
|
||||
self.cktp = self.CKTP.MANUAL
|
||||
self.ckot = self.CKOT.ALL
|
||||
self.rtd = self.RTD.UNROOTED
|
||||
self.chnl = self.CHNL.WIFI
|
||||
self.reset_session()
|
||||
|
||||
def reset_session(self):
|
||||
|
@ -3,8 +3,8 @@
|
||||
import platform
|
||||
|
||||
# Needed to make ANSI escape sequences work in Windows
|
||||
system = platform.system()
|
||||
if system == "Windows":
|
||||
SYSTEM = platform.system()
|
||||
if SYSTEM == "Windows":
|
||||
try:
|
||||
import colorama
|
||||
colorama.init()
|
||||
|
@ -5,6 +5,7 @@
|
||||
import argparse
|
||||
import webbrowser
|
||||
|
||||
|
||||
class DefaultParser(argparse.ArgumentParser):
|
||||
def __init__(self, appname, desc=None):
|
||||
super().__init__(prog=appname, description=desc, epilog="https://github.com/mbirth/tcl_ota_check")
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
import base64
|
||||
|
||||
|
||||
class CredentialsMixin:
|
||||
@staticmethod
|
||||
def get_creds():
|
||||
|
@ -2,14 +2,18 @@
|
||||
|
||||
import json
|
||||
import os
|
||||
import requests
|
||||
import time
|
||||
|
||||
import requests
|
||||
|
||||
from . import ansi
|
||||
|
||||
|
||||
DEVICELIST_URL = "https://tclota.birth-online.de/json_lastupdates.php"
|
||||
DEVICELIST_FILE = "prds.json"
|
||||
DEVICELIST_CACHE_SECONDS = 86400
|
||||
|
||||
|
||||
class DevListMixin:
|
||||
@staticmethod
|
||||
def get_devicelist(force=False, output_diff=True):
|
||||
@ -21,18 +25,18 @@ class DevListMixin:
|
||||
filemtime = filestat.st_mtime
|
||||
if filemtime > time.time() - DEVICELIST_CACHE_SECONDS:
|
||||
need_download = False
|
||||
with open(DEVICELIST_FILE, "rt") as df:
|
||||
old_prds = json.load(df)
|
||||
with open(DEVICELIST_FILE, "rt") as dlfile:
|
||||
old_prds = json.load(dlfile)
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
if need_download or force:
|
||||
prds_json = requests.get(DEVICELIST_URL).text
|
||||
with open(DEVICELIST_FILE, "wt") as df:
|
||||
df.write(prds_json)
|
||||
with open(DEVICELIST_FILE, "wt") as dlfile:
|
||||
dlfile.write(prds_json)
|
||||
|
||||
with open(DEVICELIST_FILE, "rt") as df:
|
||||
prds = json.load(df)
|
||||
with open(DEVICELIST_FILE, "rt") as dlfile:
|
||||
prds = json.load(dlfile)
|
||||
|
||||
if old_prds and output_diff:
|
||||
DevListMixin.print_prd_diff(old_prds, prds)
|
||||
|
@ -3,8 +3,10 @@
|
||||
import errno
|
||||
import glob
|
||||
import os
|
||||
|
||||
from . import ansi
|
||||
|
||||
|
||||
class DumpMgrMixin:
|
||||
def __init__(self):
|
||||
self.last_dump_filename = None
|
||||
@ -14,11 +16,11 @@ class DumpMgrMixin:
|
||||
if not os.path.exists(os.path.dirname(outfile)):
|
||||
try:
|
||||
os.makedirs(os.path.dirname(outfile))
|
||||
except OSError as e:
|
||||
if e.errno != errno.EEXIST:
|
||||
except OSError as err:
|
||||
if err.errno != errno.EEXIST:
|
||||
raise
|
||||
with open(outfile, "w", encoding="utf-8") as f:
|
||||
f.write(data)
|
||||
with open(outfile, "w", encoding="utf-8") as fhandle:
|
||||
fhandle.write(data)
|
||||
self.last_dump_filename = outfile
|
||||
|
||||
def delete_last_dump(self):
|
||||
@ -29,9 +31,9 @@ class DumpMgrMixin:
|
||||
@staticmethod
|
||||
def write_info_if_dumps_found():
|
||||
# To disable this info, uncomment the following line.
|
||||
#return
|
||||
# return
|
||||
files = glob.glob(os.path.normpath("logs/*.xml"))
|
||||
if len(files) > 0:
|
||||
if files:
|
||||
print()
|
||||
print("{}There are {} logs collected in the logs/ directory.{} Please consider uploading".format(ansi.YELLOW, len(files), ansi.RESET))
|
||||
print("them to https://tclota.birth-online.de/ by running {}./upload_logs.py{}.".format(ansi.CYAN, ansi.RESET))
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
import numpy
|
||||
|
||||
|
||||
class ServerVoteMixin:
|
||||
def __init__(self):
|
||||
self.g2master = None
|
||||
@ -41,7 +42,7 @@ class ServerVoteMixin:
|
||||
self.check_time_count += 1
|
||||
|
||||
def check_time_avg(self):
|
||||
return (self.check_time_sum / self.check_time_count)
|
||||
return self.check_time_sum / self.check_time_count
|
||||
|
||||
def master_server_vote_on_time(self, last_duration, avg_duration):
|
||||
if last_duration < avg_duration - 0.5:
|
||||
|
@ -2,28 +2,30 @@
|
||||
|
||||
import time
|
||||
from collections import OrderedDict
|
||||
|
||||
import requests
|
||||
from defusedxml import ElementTree
|
||||
|
||||
|
||||
class TclCheckMixin:
|
||||
def do_check(self, https=True, timeout=10, max_tries=5):
|
||||
protocol = "https://" if https else "http://"
|
||||
url = protocol + self.g2master + "/check.php"
|
||||
params = OrderedDict()
|
||||
params["id"] = self.serid
|
||||
params["id"] = self.serid
|
||||
params["curef"] = self.curef
|
||||
params["fv"] = self.fv
|
||||
params["mode"] = self.mode.value
|
||||
params["type"] = self.ftype
|
||||
params["cltp"] = self.cltp.value
|
||||
params["cktp"] = self.cktp.value
|
||||
params["rtd"] = self.rtd.value
|
||||
params["chnl"] = self.chnl.value
|
||||
#params["osvs"] = self.osvs
|
||||
#params["ckot"] = self.ckot.value
|
||||
params["fv"] = self.fvver
|
||||
params["mode"] = self.mode.value
|
||||
params["type"] = self.ftype
|
||||
params["cltp"] = self.cltp.value
|
||||
params["cktp"] = self.cktp.value
|
||||
params["rtd"] = self.rtd.value
|
||||
params["chnl"] = self.chnl.value
|
||||
#params["osvs"] = self.osvs
|
||||
#params["ckot"] = self.ckot.value
|
||||
|
||||
last_response = None
|
||||
for num_try in range(0, max_tries):
|
||||
for _ in range(0, max_tries):
|
||||
try:
|
||||
reqtime_start = time.perf_counter()
|
||||
req = self.sess.get(url, params=params, timeout=timeout)
|
||||
@ -59,12 +61,12 @@ class TclCheckMixin:
|
||||
def parse_check(xmlstr):
|
||||
root = ElementTree.fromstring(xmlstr)
|
||||
curef = root.find("CUREF").text
|
||||
fv = root.find("VERSION").find("FV").text
|
||||
tv = root.find("VERSION").find("TV").text
|
||||
fvver = root.find("VERSION").find("FV").text
|
||||
tvver = root.find("VERSION").find("TV").text
|
||||
fw_id = root.find("FIRMWARE").find("FW_ID").text
|
||||
fileinfo = root.find("FIRMWARE").find("FILESET").find("FILE")
|
||||
fileid = fileinfo.find("FILE_ID").text
|
||||
fileid = fileinfo.find("FILE_ID").text
|
||||
filename = fileinfo.find("FILENAME").text
|
||||
filesize = fileinfo.find("SIZE").text
|
||||
filehash = fileinfo.find("CHECKSUM").text
|
||||
return curef, fv, tv, fw_id, fileid, filename, filesize, filehash
|
||||
return curef, fvver, tvver, fw_id, fileid, filename, filesize, filehash
|
||||
|
@ -1,8 +1,10 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import json
|
||||
|
||||
from defusedxml import ElementTree
|
||||
|
||||
|
||||
class TclChecksumMixin:
|
||||
def do_checksum(self, encslave, address, uri):
|
||||
url = "http://" + encslave + "/checksum.php"
|
||||
@ -10,12 +12,12 @@ class TclChecksumMixin:
|
||||
|
||||
payload = {address: uri}
|
||||
payload_json = json.dumps(payload)
|
||||
params[b"address"] = bytes(payload_json, "utf-8")
|
||||
params[b"address"] = bytes(payload_json, "utf-8")
|
||||
|
||||
#print(repr(dict(params)))
|
||||
# print(repr(dict(params)))
|
||||
req = self.sess.post(url, data=params)
|
||||
if req.status_code == 200:
|
||||
req.encoding = "utf-8" # Force encoding as server doesn't give one
|
||||
req.encoding = "utf-8" # Force encoding as server doesn't give one
|
||||
self.write_dump(req.text)
|
||||
# <ENCRYPT_FOOTER>2abfa6f6507044fec995efede5d818e62a0b19b5</ENCRYPT_FOOTER> means ERROR (invalid ADDRESS!)
|
||||
if "<ENCRYPT_FOOTER>2abfa6f6507044fec995efede5d818e62a0b19b5</ENCRYPT_FOOTER>" in req.text:
|
||||
|
@ -1,5 +1,6 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
|
||||
class TclEncHeaderMixin:
|
||||
def do_encrypt_header(self, encslave, address):
|
||||
params = self.get_creds2()
|
||||
|
@ -4,11 +4,13 @@ import binascii
|
||||
import hashlib
|
||||
import random
|
||||
import time
|
||||
from math import floor
|
||||
import zlib
|
||||
from collections import OrderedDict
|
||||
from math import floor
|
||||
|
||||
from defusedxml import ElementTree
|
||||
|
||||
|
||||
'''
|
||||
private HashMap<String, String> buildDownloadUrisParams(UpdatePackageInfo updatePackageInfo) {
|
||||
FotaLog.m28v(TAG, "doAfterCheck");
|
||||
@ -31,6 +33,7 @@ from defusedxml import ElementTree
|
||||
}
|
||||
'''
|
||||
|
||||
|
||||
class TclRequestMixin:
|
||||
@staticmethod
|
||||
def get_salt():
|
||||
@ -41,10 +44,10 @@ class TclRequestMixin:
|
||||
def get_vk2(self, params_dict, cltp):
|
||||
params_dict["cltp"] = cltp
|
||||
query = ""
|
||||
for k, v in params_dict.items():
|
||||
if len(query) > 0:
|
||||
for key, val in params_dict.items():
|
||||
if query:
|
||||
query += "&"
|
||||
query += k + "=" + str(v)
|
||||
query += key + "=" + str(val)
|
||||
vdk = zlib.decompress(binascii.a2b_base64(self.VDKEY))
|
||||
query += vdk.decode("utf-8")
|
||||
engine = hashlib.sha1()
|
||||
@ -52,29 +55,29 @@ class TclRequestMixin:
|
||||
hexhash = engine.hexdigest()
|
||||
return hexhash
|
||||
|
||||
def do_request(self, curef, fv, tv, fw_id):
|
||||
def do_request(self, curef, fvver, tvver, fw_id):
|
||||
url = "https://" + self.g2master + "/download_request.php"
|
||||
params = OrderedDict()
|
||||
params["id"] = self.serid
|
||||
params["salt"] = self.get_salt()
|
||||
params["id"] = self.serid
|
||||
params["salt"] = self.get_salt()
|
||||
params["curef"] = curef
|
||||
params["fv"] = fv
|
||||
params["tv"] = tv
|
||||
params["type"] = self.ftype
|
||||
params["fv"] = fvver
|
||||
params["tv"] = tvver
|
||||
params["type"] = self.ftype
|
||||
params["fw_id"] = fw_id
|
||||
params["mode"] = self.mode.value
|
||||
params["vk"] = self.get_vk2(params, self.cltp.value)
|
||||
params["cltp"] = self.cltp.value
|
||||
params["cktp"] = self.cktp.value
|
||||
params["rtd"] = self.rtd.value
|
||||
params["mode"] = self.mode.value
|
||||
params["vk"] = self.get_vk2(params, self.cltp.value)
|
||||
params["cltp"] = self.cltp.value
|
||||
params["cktp"] = self.cktp.value
|
||||
params["rtd"] = self.rtd.value
|
||||
if self.mode == self.MODE.FULL:
|
||||
params["foot"] = 1
|
||||
params["chnl"] = self.chnl.value
|
||||
params["foot"] = 1
|
||||
params["chnl"] = self.chnl.value
|
||||
|
||||
#print(repr(dict(params)))
|
||||
# print(repr(dict(params)))
|
||||
req = self.sess.post(url, data=params)
|
||||
if req.status_code == 200:
|
||||
req.encoding = "utf-8" # Force encoding as server doesn't give one
|
||||
req.encoding = "utf-8" # Force encoding as server doesn't give one
|
||||
self.write_dump(req.text)
|
||||
return req.text
|
||||
else:
|
||||
@ -87,7 +90,7 @@ class TclRequestMixin:
|
||||
def parse_request(xmlstr):
|
||||
root = ElementTree.fromstring(xmlstr)
|
||||
file = root.find("FILE_LIST").find("FILE")
|
||||
fileid = file.find("FILE_ID").text
|
||||
fileid = file.find("FILE_ID").text
|
||||
fileurl = file.find("DOWNLOAD_URL").text
|
||||
s3_fileurl_node = file.find("S3_DOWNLOAD_URL")
|
||||
s3_fileurl = ""
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
import xml.dom.minidom
|
||||
|
||||
|
||||
class XmlToolsMixin:
|
||||
@staticmethod
|
||||
def pretty_xml(xmlstr):
|
||||
|
@ -7,13 +7,15 @@
|
||||
|
||||
import glob
|
||||
import os
|
||||
|
||||
import requests
|
||||
|
||||
|
||||
# This is the URL to an installation of https://github.com/mbirth/tcl_update_db
|
||||
UPLOAD_URL = "https://tclota.birth-online.de/"
|
||||
LOGS_GLOB = os.path.normpath("logs/*.xml")
|
||||
|
||||
headers = { "Content-Type": "text/xml" }
|
||||
headers = {"Content-Type": "text/xml"}
|
||||
|
||||
file_list = glob.glob(LOGS_GLOB)
|
||||
print("Found {} logs to upload.".format(len(file_list)))
|
||||
@ -39,4 +41,3 @@ try:
|
||||
print(" OK")
|
||||
except OSError as e:
|
||||
print(" FAILED")
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user