Initial commit

Signed-off-by: Markus Birth <markus@birth-online.de>
This commit is contained in:
2025-08-03 01:38:31 +01:00
commit 732e7edba4
18 changed files with 2650 additions and 0 deletions
+149
View File
@@ -0,0 +1,149 @@
import json
import time
import urequests
class TflCountdown:
API_URL = "https://countdown.api.tfl.gov.uk/interfaces/ura/instant_V1"
FIELD_NAMES = [
# ResponseType 0 - Stop
["ResponseType", "StopPointName", "StopID", "StopCode1", "StopCode2", "StopPointType", "Towards", "Bearing", "StopPointIndicator", "StopPointState", "Latitude", "Longitude"],
# ResponseType 1 - Prediction
["ResponseType", "StopPointName", "StopID", "StopCode1", "StopCode2", "StopPointType", "Towards", "Bearing", "StopPointIndicator", "StopPointState", "Latitude", "Longitude", "VisitNumber", "LineID", "LineName", "DirectionID", "DestinationText", "DestinationName", "VehicleID", "TripID", "RegistrationNumber", "EstimatedTime", "ExpireTime"],
# ResponseType 2 - Flexible Message
["ResponseType", "StopPointName", "StopID", "StopCode1", "StopCode2", "StopPointType", "Towards", "Bearing", "StopPointIndicator", "StopPointState", "Latitude", "Longitude", "MessageUUID", "MessageType", "MessagePriority", "MessageText", "StartTime", "ExpireTime"],
# ResponseType 3 - Baseversion
["ResponseType", "Version"],
# ResponseType 4 - URA Version
["ResponseType", "Version", "TimeStamp"]
]
def __init__(self, api_key: str):
self.api_key = api_key
self.return_list = ["StopPointName", "StopID", "Towards", "LineName", "DestinationText", "EstimatedTime", "MessageText"]
self.stop_ids = ["1597", "1598", "11333", "11334"]
self.time_now = time.gmtime() * 1000
def get_countdown(self, stop_ids: list = ["1598", "11333"], line_ids: list = []):
url = self.API_URL
params = {
"StopAlso": "true",
"ReturnList": ",".join(self.return_list),
"StopID": ",".join(stop_ids),
"LineID": ",".join(line_ids)
}
url += self.get_query(params)
print(url)
result = urequests.get(url)
return result
def get_query(self, params: dict):
query_str = "?"
for k in params:
if params[k] == "":
continue
query_str += f"&{k}={params[k]}"
return query_str
def get_field(self, msg: list, field_name: str):
resp_type = msg[0]
try:
full_fields = self.FIELD_NAMES[resp_type]
except:
print(f"Unknown ResponseType: {resp_type}")
return None
if resp_type in [4]:
return_fields = full_fields
else:
return_fields = ["ResponseType"] + [f for f in self.return_list if f in full_fields]
#print(repr(return_fields))
#print(repr(msg))
try:
return_idx = return_fields.index(field_name)
except:
return None
return msg[return_idx]
def strftime(self, time_tuple: tuple[int, ...], date: bool = True, time: bool = True):
result = ""
if date:
result += f"{time_tuple[0]}-{time_tuple[1]:02}-{time_tuple[2]:02}"
if time: result += " "
if time:
result += f"{time_tuple[3]:02}:{time_tuple[4]:02}:{time_tuple[5]:02}"
return result
def strfstamp(self, gmstamp_ms: int, return_date: bool = True, return_time: bool = True):
tm_obj = time.gmtime(gmstamp_ms/1000)
return self.strftime(tm_obj, return_date, return_time)
def get_due(self, gmstamp_ms: int):
diff = gmstamp_ms - self.time_now
minutes = round(diff / 60000)
return f"{minutes}min"
def parse_countdown(self, ctdn_response: str):
"""
https://content.tfl.gov.uk/tfl-live-bus-river-bus-arrivals-api-documentation.pdf
Optimised for a value of:
"ReturnList": "StopPointName,StopID,LineName,DestinationText,EstimatedTime,MessageText"
"""
result = {}
lines = ctdn_response.split("\r\n")
for l in lines:
ld = json.loads(l)
print(repr(ld))
if ld[0] == 0:
# Stop record
stop_id = self.get_field(ld, "StopID")
if not "stops" in result:
result["stops"] = {}
result["stops"][stop_id] = {
"id": stop_id,
"name": self.get_field(ld, "StopPointName"),
"towards": self.get_field(ld, "Towards"),
"lines": {},
"messages": []
}
elif ld[0] == 1:
# Prediction record
stop_id = self.get_field(ld, "StopID")
line_no = self.get_field(ld, "LineName")
destination = self.get_field(ld, "DestinationText")
est_stamp = int(self.get_field(ld, "EstimatedTime"))
if not line_no in result["stops"][stop_id]["lines"]:
result["stops"][stop_id]["lines"][line_no] = []
result["stops"][stop_id]["lines"][line_no].append({
"destination": destination,
"est_gmstamp_ms": est_stamp,
"est_gmtime": self.strfstamp(est_stamp),
"est_due": self.get_due(est_stamp)
})
elif ld[0] == 2:
# Flexible Message record
stop_id = self.get_field(ld, "StopID")
msg = self.get_field(ld, "MessageText")
result["stops"][stop_id]["messages"].append(msg)
elif ld[0] == 3:
# Baseversion record
pass
elif ld[0] == 4:
# URA Version record
ura_stamp = int(self.get_field(ld, "TimeStamp"))
# Use as reference time - perfect for embedded systems without RTC
self.time_now = ura_stamp
result["ura"] = {
"version": self.get_field(ld, "Version"),
"gmstamp_ms": ura_stamp,
"gmtime": self.strfstamp(ura_stamp)
}
else:
print(f"Unsupported ResponseType: {ld[0]}")
# Sort arrivals by estimated time
for stop_id in result["stops"]:
for line_id in result["stops"][stop_id]["lines"]:
result["stops"][stop_id]["lines"][line_id] = sorted(result["stops"][stop_id]["lines"][line_id], key=lambda x: x["est_gmstamp_ms"])
return result