Files
global-catch-up/catchup.py
Markus Birth c6299c4e0c Colours!
Signed-off-by: Markus Birth <markus@birth-online.de>
2025-12-18 22:20:38 +00:00

115 lines
4.2 KiB
Python
Executable File

#!/usr/bin/env python3
import os
from os.path import basename, dirname, isfile
import requests
import re
import json
from rich import print
from rich.progress import Progress, ProgressColumn, TextColumn, BarColumn, MofNCompleteColumn, DownloadColumn, TimeRemainingColumn, RenderableType, Task
START_URL="https://www.globalplayer.com/catchup/wsqk/uk/"
DEFAULT_HEADERS = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:146.0) Gecko/20100101 Firefox/146.0"
}
def getPageProps(url):
global DEFAULT_HEADERS
r = requests.get(url, headers=DEFAULT_HEADERS)
html = r.text
#print(html)
propmatch = re.search('__NEXT_DATA__.*>(.*)</script>', html, re.MULTILINE)
#print(repr(propmatch.group(1)))
props = json.loads(propmatch.group(1))
return props["props"]["pageProps"]
main_props = getPageProps(START_URL)
shows = main_props["catchupInfo"]
shows_done = 0
# https://github.com/Textualize/rich/discussions/1500#discussioncomment-14644763
class MyColumn(ProgressColumn):
def __init__(
self,
default_column: ProgressColumn | None = None,
alt_column: ProgressColumn | None = None
) -> None:
super().__init__()
self._default_column = default_column
self._alt_column = alt_column
def render(self, task: Task) -> RenderableType:
if task.fields.get('use_alt_column'):
if self._alt_column:
return self._alt_column.render(task)
else:
if self._default_column:
return self._default_column.render(task)
return Text()
with Progress(
TextColumn("[progress.description]{task.description}"),
BarColumn(),
MyColumn(MofNCompleteColumn(), DownloadColumn()),
TimeRemainingColumn()
) as prog:
task_show = prog.add_task("Processing show...", total=len(shows))
task_epi = prog.add_task("Processing episode...", start=False)
for show in shows:
(station, title) = show["title"].split(" | ", 2)
prog.update(task_show, description=f"Show: [bold magenta]{title}")
show_url = START_URL + show["id"] + "/"
#print(show_url)
show_props = getPageProps(show_url)
#print(show_props)
episodes = show_props["catchupInfo"]["episodes"]
episodes_done = 0
prog.start_task(task_epi)
prog.update(task_epi, completed=0, total=len(episodes))
for episode in episodes:
episode_url = episode["streamUrl"]
#print("Stream URL: " + episode_url)
episode_aired = episode["startDate"]
episode_title = episode["title"].split(" | ", 2)[1]
episode_date = episode_aired[0:10]
episode_time = episode_aired[11:19]
prog.update(task_epi, description=f"Episode: [bold green]{episode_date}[/bold green] / [bold yellow]{episode_time}")
destination_path = station + "/" + episode_date + "/" + episode_time.replace(":", "-") + " " + episode_title + ".m4a"
#print("Download to: " + destination_path)
os.makedirs(dirname(destination_path), exist_ok=True)
if not isfile(destination_path):
task_dl = prog.add_task(f"Downloading [bold cyan]{episode_date}/{basename(destination_path)}", use_alt_column=True)
try:
with requests.get(episode_url, headers=DEFAULT_HEADERS, stream=True) as r:
r.raise_for_status()
dl_size = int(r.headers["content-length"])
prog.update(task_dl, total=dl_size)
dl_done = 0
with open(destination_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
dl_done += len(chunk)
prog.update(task_dl, completed=dl_done)
except:
print("Error downloading file.")
episodes_done += 1
prog.update(task_epi, completed=episodes_done)
shows_done += 1
prog.update(task_show, completed=shows_done)