#! /usr/bin/python3 from app import app, db from app.models.user import Member, Group, GroupPrivilege from app.models.priv import SpecialPrivilege from app.models.trophy import Trophy, Title from app.models.forum import Forum from app.utils import unicode_names import os import sys import yaml import slugify import readline from PIL import Image help_msg = """ This is the Planète Casio master shell. Type 'exit' or C-D to leave. Type a category name to see a list of elements. Available categories are: 'members' Registered community members 'groups' Privilege groups 'trophies' Trophies 'trophy-members' Trophies owned by members 'forums' Forum tree For each category, an argument can be specified: * 'clear' will remove all entries in the category (destroys a lot of data!) * 'update' will update from the model in app/data/, when applicable (currently available on: forums, groups) Type 'create-common-accounts' to recreate 'Planète Casio' and 'GLaDOS' Type 'add-group #' to add a new member to a group. Type 'create-trophies' to reset trophies and titles and their icons. Type 'enable-user' to enable a email-disabled account. """ # # Category viewers # def members(*args): if args == ("clear",): for m in Member.query.all(): m.delete() db.session.commit() print("Removed all members.") return for m in Member.query.all(): print(m) def groups(*args): if args == ("clear",): for g in Group.query.all(): g.delete() db.session.commit() print("Removed all groups.") return if args == ("update",): update_groups() return for g in Group.query.all(): print(f"#{g.id} {g.name}") def trophies(*args): if args == ("clear",): for t in Trophy.query.all(): db.session.delete(t) db.session.commit() print("Removed all trophies.") return for t in Trophy.query.all(): print(t) def trophy_members(*args): for t in Trophy.query.all(): if t.owners == []: continue print(t) for m in t.owners: print(f" {m}") def forums(*args): if args == ("clear",): for f in Forum.query.all(): db.session.delete(f) db.session.commit() print("Removed all forums.") return if args == ("update",): update_forums() return for f in Forum.query.all(): parent = f"in {f.parent.url}" if f.parent is not None else "root" print(f"{f.url} ({parent}) [{f.prefix}]: {f.name}") print(f" {f.descr}") # # Creation and edition # def update_groups(): existing = Group.query.all() gr = [] with open(os.path.join(app.root_path, "data", "groups.yaml")) as fp: gr = yaml.safe_load(fp.read()) for group_info in gr: name = group_info["name"] css = group_info.get("css", "") descr = group_info.get("descr", "") privs = group_info.get("privs", "").split() g = Group.query.filter_by(name=name).first() # Update an existing group if g is not None: changes = (g.css != css) or (g.description != descr) or \ (set(g.privs()) != set(privs)) g.css = css g.description = descr for gpriv in GroupPrivilege.query.filter_by(gid=g.id): db.session.delete(gpriv) for priv in privs: db.session.add(GroupPrivilege(g, priv)) if changes: db.session.add(g) print(f"[group] Updated {g.name}") # Create a new one else: g = Group(name, css, descr) db.session.add(g) db.session.commit() for priv in privs: db.session.add(GroupPrivilege(g, priv)) print(f"[group] Created {g.name}") def create_common_accounts(): # Clean up common accounts for name in "PlanèteCasio GLaDOS".split(): m = Member.query.filter_by(name=name).first() if m is not None: m.delete() # Recreate theme def addgroup(member, group): g = Group.query.filter_by(name=group).first() if g is not None: member.groups.append(g) m = Member("PlanèteCasio", "contact@planet-casio.com", "nologin") addgroup(m, "Compte communautaire") addgroup(m, "No login") db.session.add(m) m = Member("GLaDOS", "glados@aperture.science", "nologin") m.xp = 1338 addgroup(m, "Robot") addgroup(m, "No login") db.session.add(m) db.session.commit() db.session.add(SpecialPrivilege(m, "edit-posts")) db.session.add(SpecialPrivilege(m, "shoutbox-ban")) db.session.commit() def create_trophies(): # Clean up trophies trophies("clear") # Create base trophies tr = [] with open(os.path.join(app.root_path, "data", "trophies.yaml")) as fp: tr = yaml.safe_load(fp.read()) for t in tr: description = t.get("description", "") if t["is_title"]: trophy = Title(t["name"], description, t["hidden"], t.get("css", "")) else: trophy = Trophy(t["name"], description, t["hidden"]) db.session.add(trophy) db.session.commit() print(f"Created {len(tr)} trophies.") # Create their icons create_trophies_icons() def create_trophies_icons(): tr = [] with open(os.path.join(app.root_path, "data", "trophies.yaml")) as fp: tr = yaml.safe_load(fp.read()) names = [slugify.slugify(t["name"]) for t in tr] src = os.path.join(app.root_path, "data", "trophies.png") dst = os.path.join(app.root_path, "static", "images", "trophies") try: os.mkdir(dst) except FileExistsError: pass img = Image.open(src) def trophy_iterator(img): for y in range(img.height // 26): for x in range(img.width // 26): icon = img.crop((26*x+1, 26*y+1, 26*x+25, 26*y+25)) # Skip blank squares in the source image if len(icon.getcolors()) > 1: yield icon.resize((48,48)) for (name, icon) in zip(names, trophy_iterator(img)): icon.save(os.path.join(dst, f"{name}.png")) def update_forums(): # Get current forums existing = Forum.query.all() # Get the list of forums we want to end up with fr = [] success = 0 with open(os.path.join(app.root_path, "data", "forums.yaml")) as fp: fr = yaml.safe_load(fp.read()) for url, info in fr.items(): if url == "/": parent = None else: parent_url = url.rsplit('/', 1)[0] if parent_url == "": parent_url = "/" parent = Forum.query.filter_by(url=parent_url).first() if parent is None: print(f"error: no parent with url {parent_url} for {url}") continue descr = info.get("descr", "") # Either change an existing forum endpoint (same URL) or create one f = Forum.query.filter_by(url=url).first() if f is not None: # No need to change the parent (same URL implies same parent) changes = (f.name != info["name"]) or (f.prefix != info["prefix"])\ or (f.descr != descr) f.name = info["name"] f.prefix = info["prefix"] f.descr = descr if changes: print(f"[forum] Updated {url}") else: f = Forum(url, info["name"], info["prefix"], descr, parent) print(f"[forum] Created {url}") db.session.add(f) success += 1 # Remove old forums for f in existing: if f.url not in fr: f.delete() print(f"[forum] Removed {f.url}") db.session.commit() def add_group(member, group): if group[0] != "#": print(f"error: group id {group} should start with '#'") return gid = int(group[1:]) norm = unicode_names.normalize(member) g = Group.query.filter_by(id=gid).first() m = Member.query.filter_by(norm=norm).first() if m is None: print(f"error: no member has a normalized name of '{norm}'") return m.groups.append(g) db.session.add(m) db.session.commit() def enable_user(member): norm = unicode_names.normalize(member) m = Member.query.filter_by(norm=norm).first() if m is None: print(f"error: no member has a normalized name of '{norm}'") return m.email_confirmed = True db.session.add(m) db.session.commit() # # Main program # commands = { "exit": lambda: sys.exit(0), "members": members, "groups": groups, "trophies": trophies, "trophy-members": trophy_members, "forums": forums, "create-common-accounts": create_common_accounts, "create-trophies": create_trophies, "create-trophies-icons": create_trophies_icons, "add-group": add_group, "enable-user": enable_user, } def execute(cmd): if cmd[0] not in commands: print(f"error: unknown command '{cmd[0]}'") else: commands[cmd[0]](*cmd[1:]) # If a command is specified on the command-line, use it and do not prompt if len(sys.argv) > 1: execute(sys.argv[1:]) sys.exit(0) # Otherwise, prompt interactively else: print(help_msg) while True: try: cmd = input("@> ").split() except EOFError: sys.exit(0) if cmd: execute(cmd)