Files
2026-05-15 18:07:25 +01:00

231 lines
10 KiB
Python

import configparser
import glob
import json
import requests
import os
import os.path
import pickle
from collections.abc import Callable
from atproto import Client, client_utils
from rich import print
from rich.progress import Progress, ProgressColumn, TextColumn, BarColumn, MofNCompleteColumn, TimeRemainingColumn
from time import sleep
class BlueBackBlocker():
def __init__(self, username: str, password: str):
self.agent = Client()
self.profile = self.agent.login(username, password)
self.did = self.agent.me.did
def get_lists(self):
return self.agent.app.bsky.graph.get_lists({'actor': self.did})
def get_list(self, list_id: str, progress: Callable[[int, int], None] = None):
cachefile = f"cache/bs_{list_id}.pickle"
if os.path.isfile(cachefile):
with open(cachefile, "rb") as f:
return pickle.load(f)
list_uri = f"at://{self.did}/app.bsky.graph.list/{list_id}"
cursor = None
members = []
while True:
r = self.agent.app.bsky.graph.get_list({"list": list_uri, "limit": 100, "cursor": cursor})
# returns keys "items", "list", and "cursor"
members += r["items"]
if progress:
progress(len(members), r["list"]["list_item_count"]-1)
#print(r)
sleep(0.20) # Poor man's way of making sure to not go over 5 requests per second
if not r["cursor"]:
break
cursor = r["cursor"]
results = {
"items": members,
"list": r["list"]
}
with open(cachefile, "wb") as f:
pickle.dump(results, f)
return results
def add_to_list(self, list_id: str, user_did: str):
list_uri = f"at://{self.did}/app.bsky.graph.list/{list_id}"
r = self.agent.com.atproto.repo.create_record({
"repo": self.did,
"collection": "app.bsky.graph.listitem",
"record": {
"$type": "app.bsky.graph.listitem",
"subject": user_did,
"list": list_uri,
"createdAt": self.agent.get_current_time_iso()
}
})
sleep(0.20) # Poor man's way of making sure to not go over 5 requests per second
return (r["validation_status"] == "valid")
class ClearSky():
# Req limits: 5 per second (anon) / 30 per second (auth'd)
BASE_URL = "https://public.api.clearsky.services"
def __init__(self, did):
self.did = did
def _get(self, uri: str, data_key: str = None, progress: Callable[[int, int], None] = None):
# 100 items per page, page 1 has no suffix, page 2+: /2, /3, /4, etc.
result = []
page = 1
while True:
url = self.BASE_URL + "/api/v1/" + uri
if page > 1:
url += f"/{page}"
cachefile = "cache/cs_" + url[len(self.BASE_URL)+8:].replace("/", "_") + ".json"
if os.path.isfile(cachefile):
with open(cachefile, "rt") as f:
r = json.load(f)
else:
r = requests.get(url).json()
with open(cachefile, "wt") as f:
json.dump(r, f, indent=2)
sleep(0.20) # Poor man's way of making sure to not go over 5 requests per second
data_len = len(r["data"][data_key]) if data_key else len(r["data"])
result += r["data"][data_key] if data_key else r["data"]
if data_len < 100:
if progress:
progress(data_len, data_len)
break
if progress:
progress(data_len, data_len+1)
page += 1
return result
def get_did(self, username: str) -> str:
return self._get("anon/get-did/" + username, "did_identifier")
def get_lists(self, progress: Callable[[int, int], None] = None):
"""Returns ALL lists the user is on."""
return self._get("anon/get-list/" + self.did, "lists", progress)
def get_blocking_users(self, progress: Callable[[int, int], None] = None):
"""Returns blocks from single users."""
return self._get("anon/single-blocklist/" + self.did, "blocklist", progress)
def get_subscribed_blocklists(self, progress: Callable[[int, int], None] = None):
"""Returns lists where the user is on and people subscribed to for blocking."""
return self._get("anon/subscribe-blocks-single-blocklist/" + self.did, "blocklists", progress)
def get_subscribed_users(self, blocklist_url: str, progress: Callable[[int, int], None] = None):
"""Returns the users that subscribed to the given blocklist."""
return self._get("anon/subscribe-blocks-single-blocklist/users/" + blocklist_url, "users", progress)
def main():
config = configparser.ConfigParser()
config.read("config.ini")
print(f"Logging into BlueSky as: [bold cyan]{config['BlueSky']['Username']}")
blocker = BlueBackBlocker(config["BlueSky"]["Username"], config["BlueSky"]["Password"])
print(f"Logged into BlueSky, display name: [bold blue]{blocker.profile.display_name}[/bold blue], ", end="")
print(f"DID: [bold yellow]{blocker.did}")
with Progress(
TextColumn("[progress.description]{task.description}"),
BarColumn(),
MofNCompleteColumn(),
TimeRemainingColumn(),
transient=True
) as prog:
task_bsky_list = prog.add_task(f"Loading list with id [bold white]{config['BlueSky']['ListId']}")
blocklist = blocker.get_list(config["BlueSky"]["ListId"], lambda cur, tot: prog.update(task_bsky_list, completed=cur, total=tot))
print(f"Loaded list: [bold green]{blocklist['list']['name']}[/bold green] with [bold yellow]{len(blocklist['items'])}[/bold yellow] entries.")
did_map = {}
already_blocked = set()
to_block = set()
processed = 0
for item in blocklist["items"]:
item_did = item["subject"]["did"]
already_blocked.add(item_did)
did_map[item_did] = item["subject"]["handle"]
processed += 1
print(f"Processed {processed} records, got {len(already_blocked)} unique entries.")
cs = ClearSky(blocker.did)
# People that blocked me directly (without using a list)
task_csky_users = prog.add_task(f"Loading users that blocked you directly")
blocked_by = cs.get_blocking_users(lambda cur, tot: prog.update(task_csky_users, completed=cur, total=tot))
print(f"ClearSky reports [bold yellow]{len(blocked_by)}[/bold yellow] users blocking you directly.")
for item in blocked_by:
item_did = item["did"]
if not item_did in already_blocked:
to_block.add(item_did)
print(f"[bold red]{len(to_block)}[/bold red] users are not on your list yet. ([bold green]{len(blocked_by)-len(to_block)}[/bold green] are.)")
# Lists I'm on that people subscribed to
task_csky_lists = prog.add_task(f"Loading subscribed blocklists you're on")
subscribed_blocklists_im_on = cs.get_subscribed_blocklists(lambda cur, tot: prog.update(task_csky_lists, completed=cur, total=tot))
print(f"ClearSky returned [bold yellow]{len(subscribed_blocklists_im_on)}[/bold yellow] blocklists you're on and others subscribed to.")
prev_len = len(to_block)
for item in subscribed_blocklists_im_on:
item_did = item["list_owner"]
if not item_did in already_blocked:
to_block.add(item_did)
print(f"[bold red]{len(to_block)-prev_len}[/bold red] list owners are not on your list yet.")
#print(subscribed_blocklists_im_on)
# Subscribers of those lists
for item in subscribed_blocklists_im_on:
print(f"Querying subscribers of [bold magenta]{item['list_name']}[/bold magenta]")
item_url = item["list_url"]
subscribers = cs.get_subscribed_users(item_url)
print(f"Found [bold yellow]{len(subscribers)}[/bold yellow] subscribers.", end="")
prev_len = len(to_block)
for sub in subscribers:
sub_did = sub["did"]
if not sub_did in already_blocked:
to_block.add(sub_did)
print(f"[bold red]{len(to_block)-prev_len}[/bold red] subscribers are not on your list yet.")
print(f"Totals: Found [bold red]{len(to_block)}[/bold red] users not already on list.")
success_file = "cache/successes.txt"
if os.path.isfile(success_file):
print("Found state file. Processing...")
removed = 0
with open(success_file, "rt") as f:
for line in f.readlines():
did = line.strip()
if did in to_block:
to_block.discard(did)
removed += 1
print(f"Removed [bold green]{removed}[/bold green] users that were already blocked on a previous run.")
task_add_to_list = prog.add_task(f"Adding users to list", total=len(to_block))
users_processed = 0
users_error = 0
for did_to_block in to_block:
result = blocker.add_to_list(config["BlueSky"]["ListId"], did_to_block)
if not result:
print(f"[bold red]ERROR adding user [bold blue]{did_to_block}[/bold blue] to your list.")
users_error += 1
users_processed += 1
with open(success_file, "at") as f:
f.write(did_to_block + "\n")
prog.update(task_add_to_list, completed=users_processed)
print(f"Processed {users_processed} people, got {users_error} errors.")
print("Removing BlueSky API cache file.")
for f in glob.glob("cache/bs_*.pickle"):
os.remove(f)
print("Removing ClearSky API caches.")
for f in glob.glob("cache/cs_*.json"):
os.remove(f)
print("Removing state cache.")
os.remove(success_file)
if __name__ == "__main__":
main()