import configparser import glob import json import requests import os import os.path import pickle from collections.abc import Callable from atproto import Client, client_utils from rich import print from rich.progress import Progress, ProgressColumn, TextColumn, BarColumn, MofNCompleteColumn, TimeRemainingColumn from time import sleep class BlueBackBlocker(): def __init__(self, username: str, password: str): self.agent = Client() self.profile = self.agent.login(username, password) self.did = self.agent.me.did def get_lists(self): return self.agent.app.bsky.graph.get_lists({'actor': self.did}) def get_list(self, list_id: str, progress: Callable[[int, int], None] = None): cachefile = f"cache/bs_{list_id}.pickle" if os.path.isfile(cachefile): with open(cachefile, "rb") as f: return pickle.load(f) list_uri = f"at://{self.did}/app.bsky.graph.list/{list_id}" cursor = None members = [] while True: r = self.agent.app.bsky.graph.get_list({"list": list_uri, "limit": 100, "cursor": cursor}) # returns keys "items", "list", and "cursor" members += r["items"] if progress: progress(len(members), r["list"]["list_item_count"]-1) #print(r) sleep(0.20) # Poor man's way of making sure to not go over 5 requests per second if not r["cursor"]: break cursor = r["cursor"] results = { "items": members, "list": r["list"] } with open(cachefile, "wb") as f: pickle.dump(results, f) return results def add_to_list(self, list_id: str, user_did: str): list_uri = f"at://{self.did}/app.bsky.graph.list/{list_id}" r = self.agent.com.atproto.repo.create_record({ "repo": self.did, "collection": "app.bsky.graph.listitem", "record": { "$type": "app.bsky.graph.listitem", "subject": user_did, "list": list_uri, "createdAt": self.agent.get_current_time_iso() } }) sleep(0.20) # Poor man's way of making sure to not go over 5 requests per second return (r["validation_status"] == "valid") class ClearSky(): # Req limits: 5 per second (anon) / 30 per second (auth'd) BASE_URL = "https://public.api.clearsky.services" def __init__(self, did): self.did = did def _get(self, uri: str, data_key: str = None, progress: Callable[[int, int], None] = None): # 100 items per page, page 1 has no suffix, page 2+: /2, /3, /4, etc. result = [] page = 1 while True: url = self.BASE_URL + "/api/v1/" + uri if page > 1: url += f"/{page}" cachefile = "cache/cs_" + url[len(self.BASE_URL)+8:].replace("/", "_") + ".json" if os.path.isfile(cachefile): with open(cachefile, "rt") as f: r = json.load(f) else: r = requests.get(url).json() with open(cachefile, "wt") as f: json.dump(r, f, indent=2) sleep(0.20) # Poor man's way of making sure to not go over 5 requests per second data_len = len(r["data"][data_key]) if data_key else len(r["data"]) result += r["data"][data_key] if data_key else r["data"] if data_len < 100: if progress: progress(data_len, data_len) break if progress: progress(data_len, data_len+1) page += 1 return result def get_did(self, username: str) -> str: return self._get("anon/get-did/" + username, "did_identifier") def get_lists(self, progress: Callable[[int, int], None] = None): """Returns ALL lists the user is on.""" return self._get("anon/get-list/" + self.did, "lists", progress) def get_blocking_users(self, progress: Callable[[int, int], None] = None): """Returns blocks from single users.""" return self._get("anon/single-blocklist/" + self.did, "blocklist", progress) def get_subscribed_blocklists(self, progress: Callable[[int, int], None] = None): """Returns lists where the user is on and people subscribed to for blocking.""" return self._get("anon/subscribe-blocks-single-blocklist/" + self.did, "blocklists", progress) def get_subscribed_users(self, blocklist_url: str, progress: Callable[[int, int], None] = None): """Returns the users that subscribed to the given blocklist.""" return self._get("anon/subscribe-blocks-single-blocklist/users/" + blocklist_url, "users", progress) def main(): config = configparser.ConfigParser() config.read("config.ini") print(f"Logging into BlueSky as: [bold cyan]{config['BlueSky']['Username']}") blocker = BlueBackBlocker(config["BlueSky"]["Username"], config["BlueSky"]["Password"]) print(f"Logged into BlueSky, display name: [bold blue]{blocker.profile.display_name}[/bold blue], ", end="") print(f"DID: [bold yellow]{blocker.did}") with Progress( TextColumn("[progress.description]{task.description}"), BarColumn(), MofNCompleteColumn(), TimeRemainingColumn(), transient=True ) as prog: task_bsky_list = prog.add_task(f"Loading list with id [bold white]{config['BlueSky']['ListId']}") blocklist = blocker.get_list(config["BlueSky"]["ListId"], lambda cur, tot: prog.update(task_bsky_list, completed=cur, total=tot)) print(f"Loaded list: [bold green]{blocklist['list']['name']}[/bold green] with [bold yellow]{len(blocklist['items'])}[/bold yellow] entries.") did_map = {} already_blocked = set() to_block = set() processed = 0 for item in blocklist["items"]: item_did = item["subject"]["did"] already_blocked.add(item_did) did_map[item_did] = item["subject"]["handle"] processed += 1 print(f"Processed {processed} records, got {len(already_blocked)} unique entries.") cs = ClearSky(blocker.did) # People that blocked me directly (without using a list) task_csky_users = prog.add_task(f"Loading users that blocked you directly") blocked_by = cs.get_blocking_users(lambda cur, tot: prog.update(task_csky_users, completed=cur, total=tot)) print(f"ClearSky reports [bold yellow]{len(blocked_by)}[/bold yellow] users blocking you directly.") for item in blocked_by: item_did = item["did"] if not item_did in already_blocked: to_block.add(item_did) print(f"[bold red]{len(to_block)}[/bold red] users are not on your list yet. ([bold green]{len(blocked_by)-len(to_block)}[/bold green] are.)") # Lists I'm on that people subscribed to task_csky_lists = prog.add_task(f"Loading subscribed blocklists you're on") subscribed_blocklists_im_on = cs.get_subscribed_blocklists(lambda cur, tot: prog.update(task_csky_lists, completed=cur, total=tot)) print(f"ClearSky returned [bold yellow]{len(subscribed_blocklists_im_on)}[/bold yellow] blocklists you're on and others subscribed to.") prev_len = len(to_block) for item in subscribed_blocklists_im_on: item_did = item["list_owner"] if not item_did in already_blocked: to_block.add(item_did) print(f"[bold red]{len(to_block)-prev_len}[/bold red] list owners are not on your list yet.") #print(subscribed_blocklists_im_on) # Subscribers of those lists for item in subscribed_blocklists_im_on: print(f"Querying subscribers of [bold magenta]{item['list_name']}[/bold magenta]") item_url = item["list_url"] subscribers = cs.get_subscribed_users(item_url) print(f"Found [bold yellow]{len(subscribers)}[/bold yellow] subscribers.", end="") prev_len = len(to_block) for sub in subscribers: sub_did = sub["did"] if not sub_did in already_blocked: to_block.add(sub_did) print(f"[bold red]{len(to_block)-prev_len}[/bold red] subscribers are not on your list yet.") print(f"Totals: Found [bold red]{len(to_block)}[/bold red] users not already on list.") success_file = "cache/successes.txt" if os.path.isfile(success_file): print("Found state file. Processing...") removed = 0 with open(success_file, "rt") as f: for line in f.readlines(): did = line.strip() if did in to_block: to_block.discard(did) removed += 1 print(f"Removed [bold green]{removed}[/bold green] users that were already blocked on a previous run.") task_add_to_list = prog.add_task(f"Adding users to list", total=len(to_block)) users_processed = 0 users_error = 0 for did_to_block in to_block: result = blocker.add_to_list(config["BlueSky"]["ListId"], did_to_block) if not result: print(f"[bold red]ERROR adding user [bold blue]{did_to_block}[/bold blue] to your list.") users_error += 1 users_processed += 1 with open(success_file, "at") as f: f.write(did_to_block + "\n") prog.update(task_add_to_list, completed=users_processed) print(f"Processed {users_processed} people, got {users_error} errors.") print("Removing BlueSky API cache file.") for f in glob.glob("cache/bs_*.pickle"): os.remove(f) print("Removing ClearSky API caches.") for f in glob.glob("cache/cs_*.json"): os.remove(f) print("Removing state cache.") os.remove(success_file) if __name__ == "__main__": main()