#! /usr/bin/python3 from app import app, db from app.models.user import Member, Group, GroupPrivilege from app.models.priv import SpecialPrivilege from app.models.trophy import Trophy, Title from app.models.forum import Forum from app.models.tag import TagInformation from app.utils import unicode_names import os import sys import yaml import slugify import readline from PIL import Image from PIL.Image import Resampling help_msg = """ This is the Planète Casio master shell. Type 'exit' or C-D to leave. Type 'help' to print this message. Listing commands: members Show registered community members groups Show privilege groups forums Show forum tree trophies Show trophies tags Show tags Install and update commands: update-all Create or update all assets update-groups Create or update groups from app/data/ update-forums Create or update the forum tree from app/data/ update-trophies Create or update trophies update-tags Create or update tag information generate-trophy-icons Regenerate all trophy icons create-common-accounts Remove and recreate 'Planète Casio' and 'GLaDOS' add-group # Add to group # (presumably admins) enable-user Manually confirm member's email address """ # # Listing commands # def members(*args): for m in Member.query.all(): print(m) def groups(*args): for g in Group.query.all(): print(f"#{g.id} {g.name}") def forums(*args): for f in Forum.query.all(): parent = f"in {f.parent.url}" if f.parent is not None else "root" print(f"{f.url} ({parent}) [{f.prefix}]: {f.name}") print(f" {f.descr}") def trophies(*args): for t in Trophy.query.all(): print(t) def tags(*args): tags = TagInformation.query.all() for t in sorted(tags, key=lambda t: t.id): print(f"{t.id}: {t.pretty}") # # Install and update commands # def update_all(): update_groups() update_forums() update_forums() update_trophies() update_tags() generate_trophy_icons() def update_groups(): existing = Group.query.all() gr = [] with open(os.path.join(app.root_path, "data", "groups.yaml")) as fp: gr = yaml.safe_load(fp.read()) for group_info in gr: name = group_info["name"] css = group_info.get("css", "") descr = group_info.get("descr", "") privs = group_info.get("privs", "").split() g = Group.query.filter_by(name=name).first() # Update an existing group if g is not None: changes = (g.css != css) or (g.description != descr) or \ (set(g.privs()) != set(privs)) g.css = css g.description = descr for gpriv in g.privileges: db.session.delete(gpriv) for priv in privs: db.session.add(GroupPrivilege(g, priv)) if changes: db.session.add(g) print(f"[group] Updated {g.name}") # Create a new one else: g = Group(name, css, descr) db.session.add(g) db.session.commit() for priv in privs: db.session.add(GroupPrivilege(g, priv)) print(f"[group] Created {g.name}") db.session.commit() def update_forums(): # Get current forums existing = Forum.query.all() # Get the list of forums we want to end up with fr = [] success = 0 with open(os.path.join(app.root_path, "data", "forums.yaml")) as fp: fr = yaml.safe_load(fp.read()) for url, info in fr.items(): if url == "/": parent = None else: parent_url = url.rsplit('/', 1)[0] if parent_url == "": parent_url = "/" parent = Forum.query.filter_by(url=parent_url).first() if parent is None: print(f"error: no parent with url {parent_url} for {url}") continue descr = info.get("descr", "") # Either change an existing forum endpoint (same URL) or create one f = Forum.query.filter_by(url=url).first() if f is not None: # No need to change the parent (same URL implies same parent) changes = (f.name != info["name"]) or (f.prefix != info["prefix"])\ or (f.descr != descr) f.name = info["name"] f.prefix = info["prefix"] f.descr = descr if changes: print(f"[forum] Updated {url}") else: f = Forum(url, info["name"], info["prefix"], descr, parent) print(f"[forum] Created {url}") db.session.add(f) success += 1 # Remove old forums for f in existing: if f.url not in fr: f.delete() print(f"[forum] Removed {f.url}") db.session.commit() def update_trophies(): existing = Trophy.query.all() # Get the list of what we want to obtain with open(os.path.join(app.root_path, "data", "trophies.yaml")) as fp: tr = yaml.safe_load(fp.read()) tr = { t["name"]: t for t in tr } # Remove trophies that we don't want or that we want as a different type for t in existing: if t.name not in tr or isinstance(t, Title) != tr[t.name]["is_title"]: kind = "title" if isinstance(t, Title) else "trophy" print(f"[trophies] Deleted '{t.name}' ({kind})") db.session.delete(t) db.session.commit() # Add missing trophies for name, t in tr.items(): description = t.get("description", "") css = t.get("css", "") trophy = Trophy.query.filter_by(name=name).first() if "css" in t and not t["is_title"]: print(f"[trophies] CSS on '{name}' is meaningless (not a title)") # Updating existing trophies if trophy is not None: changes = (trophy.description != description) or \ (trophy.hidden != t["hidden"] or (isinstance(trophy,Title) and \ trophy.css != css)) trophy.description = description trophy.hidden = t["hidden"] if isinstance(trophy, Title): trophy.css = css if changes: print(f"[trophies] Updated '{name}'") # Add missing ones elif t["is_title"]: trophy = Title(name, description, t["hidden"], t.get("css", "")) print(f"[trophies] Created '{name}' (title)") else: trophy = Trophy(name, description, t["hidden"]) print(f"[trophies] Created '{name}' (trophy)") db.session.add(trophy) db.session.commit() def update_tags(): existing = TagInformation.query.all() with open(os.path.join(app.root_path, "data", "tags.yaml")) as fp: data = yaml.safe_load(fp.read()) tags = { ctgy + "." + name: data[ctgy][name] for ctgy in data for name in data[ctgy] } # Remove bad tags for t in existing: if t.id not in tags: print(f"[tags] Deleted '{t.id}'") db.session.delete(t) db.session.commit() for name, info in tags.items(): pretty = info.get("pretty", name) t = TagInformation.query.filter_by(id=name).first() if t is not None: changes = (t.pretty != pretty) t.pretty = pretty if changes: print(f"[tags] Updated '{name}'") else: t = TagInformation(name) t.pretty = pretty print(f"[tags] Created '{name}'") db.session.add(t) db.session.commit() def generate_trophy_icons(): tr = [] with open(os.path.join(app.root_path, "data", "trophies.yaml")) as fp: tr = yaml.safe_load(fp.read()) names = [slugify.slugify(t["name"]) for t in tr] src = os.path.join(app.root_path, "data", "trophies.png") dst = os.path.join(app.root_path, "static", "images", "trophies") try: os.mkdir(dst) except FileExistsError: pass img = Image.open(src) def trophy_iterator(img): for y in range(img.height // 26): for x in range(img.width // 26): icon = img.crop((26*x+1, 26*y+1, 26*x+25, 26*y+25)) # Skip blank squares in the source image if len(icon.getcolors()) > 1: yield icon.resize((48,48), resample=Resampling.NEAREST) for (name, icon) in zip(names, trophy_iterator(img)): icon.save(os.path.join(dst, f"{name}.png")) def create_common_accounts(): # Clean up common accounts for name in "PlanèteCasio GLaDOS".split(): m = Member.query.filter_by(name=name).first() if m is not None: m.delete() # Recreate theme def addgroup(member, group): g = Group.query.filter_by(name=group).first() if g is not None: member.groups.append(g) m = Member("PlanèteCasio", "contact@planet-casio.com", "nologin") addgroup(m, "Compte communautaire") addgroup(m, "No login") db.session.add(m) m = Member("GLaDOS", "glados@aperture.science", "nologin") m.xp = 1338 addgroup(m, "Robot") addgroup(m, "No login") db.session.add(m) db.session.commit() db.session.add(SpecialPrivilege(m, "edit.posts")) db.session.add(SpecialPrivilege(m, "shoutbox.ban")) db.session.commit() def add_group(member, group): if group[0] != "#": print(f"error: group id {group} should start with '#'") return gid = int(group[1:]) norm = unicode_names.normalize(member) g = Group.query.filter_by(id=gid).first() m = Member.query.filter_by(norm=norm).first() if m is None: print(f"error: no member has a normalized name of '{norm}'") return m.groups.append(g) db.session.add(m) db.session.commit() def enable_user(member): norm = unicode_names.normalize(member) m = Member.query.filter_by(norm=norm).first() if m is None: print(f"error: no member has a normalized name of '{norm}'") return m.email_confirmed = True db.session.add(m) db.session.commit() # # Main program # commands = { "exit": lambda: sys.exit(0), "help": lambda: print(help_msg), "members": members, "groups": groups, "forums": forums, "trophies": trophies, "tags": tags, "update-all": update_all, "update-groups": update_groups, "update-forums": update_forums, "update-trophies": update_trophies, "update-tags": update_tags, "generate-trophy-icons": generate_trophy_icons, "create-common-accounts": create_common_accounts, "add-group": add_group, "enable-user": enable_user, } def execute(cmd): if cmd[0] not in commands: print(f"error: unknown command '{cmd[0]}'") else: commands[cmd[0]](*cmd[1:]) # If a command is specified on the command-line, use it and do not prompt if len(sys.argv) > 1: execute(sys.argv[1:]) sys.exit(0) # Otherwise, prompt interactively else: print(help_msg) while True: try: cmd = input("@> ").split() except EOFError: print("^D") sys.exit(0) except KeyboardInterrupt: print("^C") sys.exit(0) if cmd: execute(cmd)