#! /usr/bin/python3 from app import app, db from app.models.user import Member, Group, GroupPrivilege from app.models.priv import SpecialPrivilege from app.models.trophy import Trophy, Title from app.models.forum import Forum from app.utils import unicode_names import os import sys import yaml import slugify import readline from PIL import Image help_msg = """ This is the Planète Casio master shell. Type 'exit' or C-D to leave. Type 'help' to print this message. Listing commands: members Show registered community members groups Show privilege groups forums Show forum tree trophies Show trophies Install and update commands: update-groups Create or update groups from app/data/ update-forums Create or update the forum tree from app/data/ update-trophies Create or update trophies generate-trophy-icons Regenerate all trophy icons create-common-accounts Remove and recreate 'Planète Casio' and 'GLaDOS' add-group # Add to group # (presumably admins) enable-user Manually confirm member's email address """ # # Listing commands # def members(*args): for m in Member.query.all(): print(m) def groups(*args): for g in Group.query.all(): print(f"#{g.id} {g.name}") def forums(*args): for f in Forum.query.all(): parent = f"in {f.parent.url}" if f.parent is not None else "root" print(f"{f.url} ({parent}) [{f.prefix}]: {f.name}") print(f" {f.descr}") def trophies(*args): for t in Trophy.query.all(): print(t) # # Install and update commands # def update_groups(): existing = Group.query.all() gr = [] with open(os.path.join(app.root_path, "data", "groups.yaml")) as fp: gr = yaml.safe_load(fp.read()) for group_info in gr: name = group_info["name"] css = group_info.get("css", "") descr = group_info.get("descr", "") privs = group_info.get("privs", "").split() g = Group.query.filter_by(name=name).first() # Update an existing group if g is not None: changes = (g.css != css) or (g.description != descr) or \ (set(g.privs()) != set(privs)) g.css = css g.description = descr for gpriv in GroupPrivilege.query.filter_by(gid=g.id): db.session.delete(gpriv) for priv in privs: db.session.add(GroupPrivilege(g, priv)) if changes: db.session.add(g) print(f"[group] Updated {g.name}") # Create a new one else: g = Group(name, css, descr) db.session.add(g) db.session.commit() for priv in privs: db.session.add(GroupPrivilege(g, priv)) print(f"[group] Created {g.name}") db.session.commit() def update_forums(): # Get current forums existing = Forum.query.all() # Get the list of forums we want to end up with fr = [] success = 0 with open(os.path.join(app.root_path, "data", "forums.yaml")) as fp: fr = yaml.safe_load(fp.read()) for url, info in fr.items(): if url == "/": parent = None else: parent_url = url.rsplit('/', 1)[0] if parent_url == "": parent_url = "/" parent = Forum.query.filter_by(url=parent_url).first() if parent is None: print(f"error: no parent with url {parent_url} for {url}") continue descr = info.get("descr", "") # Either change an existing forum endpoint (same URL) or create one f = Forum.query.filter_by(url=url).first() if f is not None: # No need to change the parent (same URL implies same parent) changes = (f.name != info["name"]) or (f.prefix != info["prefix"])\ or (f.descr != descr) f.name = info["name"] f.prefix = info["prefix"] f.descr = descr if changes: print(f"[forum] Updated {url}") else: f = Forum(url, info["name"], info["prefix"], descr, parent) print(f"[forum] Created {url}") db.session.add(f) success += 1 # Remove old forums for f in existing: if f.url not in fr: f.delete() print(f"[forum] Removed {f.url}") db.session.commit() def update_trophies(): existing = Trophy.query.all() # Get the list of what we want to obtain tr = [] with open(os.path.join(app.root_path, "data", "trophies.yaml")) as fp: tr = yaml.safe_load(fp.read()) tr = { t["name"]: t for t in tr } # Remove trophies that we don't want or that we want as a different type for t in existing: if t.name not in tr or isinstance(t, Title) != tr[t.name]["is_title"]: kind = "title" if isinstance(t, Title) else "trophy" print(f"[trophies] Deleted '{t.name}' ({kind})") db.session.delete(t) db.session.commit() # Add missing trophies for name, t in tr.items(): description = t.get("description", "") css = t.get("css", "") trophy = Trophy.query.filter_by(name=name).first() if "css" in t and not t["is_title"]: print(f"[trophies] CSS on '{name}' is meaningless (not a title)") # Updating existing trophies if trophy is not None: changes = (trophy.description != description) or \ (trophy.hidden != t["hidden"] or (isinstance(trophy,Title) and \ trophy.css != css)) trophy.description = description trophy.hidden = t["hidden"] if isinstance(trophy, Title): trophy.css = css if changes: print(f"[trophies] Updated '{name}'") # Add missing ones elif t["is_title"]: trophy = Title(name, description, t["hidden"], t.get("css", "")) print(f"[trophies] Created '{name}' (title)") else: trophy = Trophy(name, description, t["hidden"]) print(f"[trophies] Created '{name}' (trophy)") db.session.add(trophy) db.session.commit() def generate_trophy_icons(): tr = [] with open(os.path.join(app.root_path, "data", "trophies.yaml")) as fp: tr = yaml.safe_load(fp.read()) names = [slugify.slugify(t["name"]) for t in tr] src = os.path.join(app.root_path, "data", "trophies.png") dst = os.path.join(app.root_path, "static", "images", "trophies") try: os.mkdir(dst) except FileExistsError: pass img = Image.open(src) def trophy_iterator(img): for y in range(img.height // 26): for x in range(img.width // 26): icon = img.crop((26*x+1, 26*y+1, 26*x+25, 26*y+25)) # Skip blank squares in the source image if len(icon.getcolors()) > 1: yield icon.resize((48,48), resample=Image.NEAREST) for (name, icon) in zip(names, trophy_iterator(img)): icon.save(os.path.join(dst, f"{name}.png")) def create_common_accounts(): # Clean up common accounts for name in "PlanèteCasio GLaDOS".split(): m = Member.query.filter_by(name=name).first() if m is not None: m.delete() # Recreate theme def addgroup(member, group): g = Group.query.filter_by(name=group).first() if g is not None: member.groups.append(g) m = Member("PlanèteCasio", "contact@planet-casio.com", "nologin") addgroup(m, "Compte communautaire") addgroup(m, "No login") db.session.add(m) m = Member("GLaDOS", "glados@aperture.science", "nologin") m.xp = 1338 addgroup(m, "Robot") addgroup(m, "No login") db.session.add(m) db.session.commit() db.session.add(SpecialPrivilege(m, "edit.posts")) db.session.add(SpecialPrivilege(m, "shoutbox.ban")) db.session.commit() def add_group(member, group): if group[0] != "#": print(f"error: group id {group} should start with '#'") return gid = int(group[1:]) norm = unicode_names.normalize(member) g = Group.query.filter_by(id=gid).first() m = Member.query.filter_by(norm=norm).first() if m is None: print(f"error: no member has a normalized name of '{norm}'") return m.groups.append(g) db.session.add(m) db.session.commit() def enable_user(member): norm = unicode_names.normalize(member) m = Member.query.filter_by(norm=norm).first() if m is None: print(f"error: no member has a normalized name of '{norm}'") return m.email_confirmed = True db.session.add(m) db.session.commit() # # Main program # commands = { "exit": lambda: sys.exit(0), "help": lambda: print(help_msg), "members": members, "groups": groups, "forums": forums, "trophies": trophies, "update-groups": update_groups, "update-forums": update_forums, "update-trophies": update_trophies, "generate-trophy-icons": generate_trophy_icons, "create-common-accounts": create_common_accounts, "add-group": add_group, "enable-user": enable_user, } def execute(cmd): if cmd[0] not in commands: print(f"error: unknown command '{cmd[0]}'") else: commands[cmd[0]](*cmd[1:]) # If a command is specified on the command-line, use it and do not prompt if len(sys.argv) > 1: execute(sys.argv[1:]) sys.exit(0) # Otherwise, prompt interactively else: print(help_msg) while True: try: cmd = input("@> ").split() except EOFError: print("^D") sys.exit(0) except KeyboardInterrupt: print("^C") sys.exit(0) if cmd: execute(cmd)